]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
Additional public keys existence checks
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "fmt"
25         "io"
26         "os"
27         "path/filepath"
28         "sort"
29         "sync"
30         "time"
31
32         xdr "github.com/davecgh/go-xdr/xdr2"
33         "github.com/dustin/go-humanize"
34         "github.com/flynn/noise"
35 )
36
37 const (
38         MaxSPSize      = 1<<16 - 256
39         PartSuffix     = ".part"
40         SPHeadOverhead = 4
41 )
42
43 type MTHAndOffset struct {
44         mth    MTH
45         offset uint64
46 }
47
48 type SPCheckerTask struct {
49         nodeId *NodeId
50         hsh    *[MTHSize]byte
51         mth    MTH
52         done   chan []byte
53 }
54
55 var (
56         SPInfoOverhead    int
57         SPFreqOverhead    int
58         SPFileOverhead    int
59         SPHaltMarshalized []byte
60         SPPingMarshalized []byte
61
62         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
63                 noise.DH25519,
64                 noise.CipherChaChaPoly,
65                 noise.HashBLAKE2b,
66         )
67
68         DefaultDeadline = 10 * time.Second
69         PingTimeout     = time.Minute
70
71         spCheckerTasks chan SPCheckerTask
72         SPCheckerWg    sync.WaitGroup
73         spCheckerOnce  sync.Once
74 )
75
76 type FdAndFullSize struct {
77         fd       *os.File
78         fullSize int64
79 }
80
81 type SPType uint8
82
83 const (
84         SPTypeInfo SPType = iota
85         SPTypeFreq SPType = iota
86         SPTypeFile SPType = iota
87         SPTypeDone SPType = iota
88         SPTypeHalt SPType = iota
89         SPTypePing SPType = iota
90 )
91
92 type SPHead struct {
93         Type SPType
94 }
95
96 type SPInfo struct {
97         Nice uint8
98         Size uint64
99         Hash *[MTHSize]byte
100 }
101
102 type SPFreq struct {
103         Hash   *[MTHSize]byte
104         Offset uint64
105 }
106
107 type SPFile struct {
108         Hash    *[MTHSize]byte
109         Offset  uint64
110         Payload []byte
111 }
112
113 type SPDone struct {
114         Hash *[MTHSize]byte
115 }
116
117 type SPRaw struct {
118         Magic   [8]byte
119         Payload []byte
120 }
121
122 type FreqWithNice struct {
123         freq *SPFreq
124         nice uint8
125 }
126
127 type ConnDeadlined interface {
128         io.ReadWriteCloser
129         SetReadDeadline(t time.Time) error
130         SetWriteDeadline(t time.Time) error
131 }
132
133 func init() {
134         var buf bytes.Buffer
135         spHead := SPHead{Type: SPTypeHalt}
136         if _, err := xdr.Marshal(&buf, spHead); err != nil {
137                 panic(err)
138         }
139         SPHaltMarshalized = make([]byte, SPHeadOverhead)
140         copy(SPHaltMarshalized, buf.Bytes())
141         buf.Reset()
142
143         spHead = SPHead{Type: SPTypePing}
144         if _, err := xdr.Marshal(&buf, spHead); err != nil {
145                 panic(err)
146         }
147         SPPingMarshalized = make([]byte, SPHeadOverhead)
148         copy(SPPingMarshalized, buf.Bytes())
149         buf.Reset()
150
151         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([MTHSize]byte)}
152         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
153                 panic(err)
154         }
155         SPInfoOverhead = buf.Len()
156         buf.Reset()
157
158         spFreq := SPFreq{Hash: new([MTHSize]byte), Offset: 123}
159         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
160                 panic(err)
161         }
162         SPFreqOverhead = buf.Len()
163         buf.Reset()
164
165         spFile := SPFile{Hash: new([MTHSize]byte), Offset: 123}
166         if _, err := xdr.Marshal(&buf, spFile); err != nil {
167                 panic(err)
168         }
169         SPFileOverhead = buf.Len()
170         spCheckerTasks = make(chan SPCheckerTask)
171 }
172
173 func MarshalSP(typ SPType, sp interface{}) []byte {
174         var buf bytes.Buffer
175         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
176                 panic(err)
177         }
178         if _, err := xdr.Marshal(&buf, sp); err != nil {
179                 panic(err)
180         }
181         return buf.Bytes()
182 }
183
184 func payloadsSplit(payloads [][]byte) [][]byte {
185         var outbounds [][]byte
186         outbound := make([]byte, 0, MaxSPSize)
187         for i, payload := range payloads {
188                 outbound = append(outbound, payload...)
189                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
190                         outbounds = append(outbounds, outbound)
191                         outbound = make([]byte, 0, MaxSPSize)
192                 }
193         }
194         if len(outbound) > 0 {
195                 outbounds = append(outbounds, outbound)
196         }
197         return outbounds
198 }
199
200 type SPState struct {
201         Ctx            *Ctx
202         Node           *Node
203         Nice           uint8
204         NoCK           bool
205         onlineDeadline time.Duration
206         maxOnlineTime  time.Duration
207         hs             *noise.HandshakeState
208         csOur          *noise.CipherState
209         csTheir        *noise.CipherState
210         payloads       chan []byte
211         pings          chan struct{}
212         infosTheir     map[[MTHSize]byte]*SPInfo
213         infosOurSeen   map[[MTHSize]byte]uint8
214         queueTheir     []*FreqWithNice
215         wg             sync.WaitGroup
216         RxBytes        int64
217         RxLastSeen     time.Time
218         RxLastNonPing  time.Time
219         TxBytes        int64
220         TxLastSeen     time.Time
221         TxLastNonPing  time.Time
222         started        time.Time
223         mustFinishAt   time.Time
224         Duration       time.Duration
225         RxSpeed        int64
226         TxSpeed        int64
227         rxLock         *os.File
228         txLock         *os.File
229         xxOnly         TRxTx
230         rxRate         int
231         txRate         int
232         isDead         chan struct{}
233         listOnly       bool
234         onlyPkts       map[[MTHSize]byte]bool
235         writeSPBuf     bytes.Buffer
236         fds            map[string]FdAndFullSize
237         fdsLock        sync.RWMutex
238         fileHashers    map[string]*MTHAndOffset
239         progressBars   map[string]struct{}
240         sync.RWMutex
241 }
242
243 func (state *SPState) SetDead() {
244         state.Lock()
245         defer state.Unlock()
246         select {
247         case <-state.isDead:
248                 // Already closed channel, dead
249                 return
250         default:
251         }
252         close(state.isDead)
253         go func() {
254                 for range state.payloads {
255                 }
256         }()
257         go func() {
258                 for range state.pings {
259                 }
260         }()
261 }
262
263 func (state *SPState) NotAlive() bool {
264         select {
265         case <-state.isDead:
266                 return true
267         default:
268         }
269         return false
270 }
271
272 func (state *SPState) dirUnlock() {
273         state.Ctx.UnlockDir(state.rxLock)
274         state.Ctx.UnlockDir(state.txLock)
275 }
276
277 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
278         state.writeSPBuf.Reset()
279         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
280                 Magic:   MagicNNCPSv1.B,
281                 Payload: payload,
282         })
283         if err != nil {
284                 return err
285         }
286         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
287                 state.TxLastSeen = time.Now()
288                 state.TxBytes += int64(n)
289                 if !ping {
290                         state.TxLastNonPing = state.TxLastSeen
291                 }
292         }
293         return err
294 }
295
296 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
297         var sp SPRaw
298         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
299         if err != nil {
300                 ue := err.(*xdr.UnmarshalError)
301                 if ue.Err == io.EOF {
302                         return nil, ue.Err
303                 }
304                 return nil, err
305         }
306         state.RxLastSeen = time.Now()
307         state.RxBytes += int64(n)
308         if sp.Magic != MagicNNCPSv1.B {
309                 return nil, BadMagic
310         }
311         return sp.Payload, nil
312 }
313
314 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[MTHSize]byte]uint8) [][]byte {
315         var infos []*SPInfo
316         var totalSize int64
317         for job := range ctx.Jobs(nodeId, TTx) {
318                 if job.PktEnc.Nice > nice {
319                         continue
320                 }
321                 if _, known := (*seen)[*job.HshValue]; known {
322                         continue
323                 }
324                 totalSize += job.Size
325                 infos = append(infos, &SPInfo{
326                         Nice: job.PktEnc.Nice,
327                         Size: uint64(job.Size),
328                         Hash: job.HshValue,
329                 })
330                 (*seen)[*job.HshValue] = job.PktEnc.Nice
331         }
332         sort.Sort(ByNice(infos))
333         var payloads [][]byte
334         for _, info := range infos {
335                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
336                 pktName := Base32Codec.EncodeToString(info.Hash[:])
337                 ctx.LogD("sp-info-our", LEs{
338                         {"Node", nodeId},
339                         {"Name", pktName},
340                         {"Size", info.Size},
341                 }, func(les LEs) string {
342                         return fmt.Sprintf(
343                                 "Our info: %s/tx/%s (%s)",
344                                 ctx.NodeName(nodeId),
345                                 pktName,
346                                 humanize.IBytes(info.Size),
347                         )
348                 })
349         }
350         if totalSize > 0 {
351                 ctx.LogI("sp-infos-tx", LEs{
352                         {"XX", string(TTx)},
353                         {"Node", nodeId},
354                         {"Pkts", len(payloads)},
355                         {"Size", totalSize},
356                 }, func(les LEs) string {
357                         return fmt.Sprintf(
358                                 "We have got for %s: %d packets, %s",
359                                 ctx.NodeName(nodeId),
360                                 len(payloads),
361                                 humanize.IBytes(uint64(totalSize)),
362                         )
363                 })
364         }
365         return payloadsSplit(payloads)
366 }
367
368 func (state *SPState) StartI(conn ConnDeadlined) error {
369         nodeId := state.Node.Id
370         err := state.Ctx.ensureRxDir(nodeId)
371         if err != nil {
372                 return err
373         }
374         var rxLock *os.File
375         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
376                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
377                 if err != nil {
378                         return err
379                 }
380         }
381         var txLock *os.File
382         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
383                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
384                 if err != nil {
385                         return err
386                 }
387         }
388         started := time.Now()
389         conf := noise.Config{
390                 CipherSuite: NoiseCipherSuite,
391                 Pattern:     noise.HandshakeIK,
392                 Initiator:   true,
393                 StaticKeypair: noise.DHKey{
394                         Private: state.Ctx.Self.NoisePrv[:],
395                         Public:  state.Ctx.Self.NoisePub[:],
396                 },
397                 PeerStatic: state.Node.NoisePub[:],
398         }
399         hs, err := noise.NewHandshakeState(conf)
400         if err != nil {
401                 return err
402         }
403         state.hs = hs
404         state.payloads = make(chan []byte)
405         state.pings = make(chan struct{})
406         state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
407         state.infosOurSeen = make(map[[MTHSize]byte]uint8)
408         state.progressBars = make(map[string]struct{})
409         state.started = started
410         state.rxLock = rxLock
411         state.txLock = txLock
412
413         var infosPayloads [][]byte
414         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
415                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
416         }
417         var firstPayload []byte
418         if len(infosPayloads) > 0 {
419                 firstPayload = infosPayloads[0]
420         }
421         // Pad first payload, to hide actual number of existing files
422         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
423                 firstPayload = append(firstPayload, SPHaltMarshalized...)
424         }
425
426         var buf []byte
427         var payload []byte
428         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
429         if err != nil {
430                 state.dirUnlock()
431                 return err
432         }
433         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
434         state.Ctx.LogD("sp-startI", les, func(les LEs) string {
435                 return fmt.Sprintf(
436                         "SP with %s (nice %s): sending first message",
437                         state.Node.Name,
438                         NicenessFmt(state.Nice),
439                 )
440         })
441         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
442         if err = state.WriteSP(conn, buf, false); err != nil {
443                 state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
444                         return fmt.Sprintf(
445                                 "SP with %s (nice %s): writing",
446                                 state.Node.Name,
447                                 NicenessFmt(state.Nice),
448                         )
449                 })
450                 state.dirUnlock()
451                 return err
452         }
453         state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string {
454                 return fmt.Sprintf(
455                         "SP with %s (nice %s): waiting for first message",
456                         state.Node.Name,
457                         NicenessFmt(state.Nice),
458                 )
459         })
460         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
461         if buf, err = state.ReadSP(conn); err != nil {
462                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
463                         return fmt.Sprintf(
464                                 "SP with %s (nice %s): reading",
465                                 state.Node.Name,
466                                 NicenessFmt(state.Nice),
467                         )
468                 })
469                 state.dirUnlock()
470                 return err
471         }
472         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
473         if err != nil {
474                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
475                         return fmt.Sprintf(
476                                 "SP with %s (nice %s): reading Noise message",
477                                 state.Node.Name,
478                                 NicenessFmt(state.Nice),
479                         )
480                 })
481                 state.dirUnlock()
482                 return err
483         }
484         state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string {
485                 return fmt.Sprintf(
486                         "SP with %s (nice %s): starting workers",
487                         state.Node.Name,
488                         NicenessFmt(state.Nice),
489                 )
490         })
491         err = state.StartWorkers(conn, infosPayloads, payload)
492         if err != nil {
493                 state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string {
494                         return fmt.Sprintf(
495                                 "SP with %s (nice %s): starting workers",
496                                 state.Node.Name,
497                                 NicenessFmt(state.Nice),
498                         )
499                 })
500                 state.dirUnlock()
501         }
502         return err
503 }
504
505 func (state *SPState) StartR(conn ConnDeadlined) error {
506         started := time.Now()
507         conf := noise.Config{
508                 CipherSuite: NoiseCipherSuite,
509                 Pattern:     noise.HandshakeIK,
510                 Initiator:   false,
511                 StaticKeypair: noise.DHKey{
512                         Private: state.Ctx.Self.NoisePrv[:],
513                         Public:  state.Ctx.Self.NoisePub[:],
514                 },
515         }
516         hs, err := noise.NewHandshakeState(conf)
517         if err != nil {
518                 return err
519         }
520         xxOnly := TRxTx("")
521         state.hs = hs
522         state.payloads = make(chan []byte)
523         state.pings = make(chan struct{})
524         state.infosOurSeen = make(map[[MTHSize]byte]uint8)
525         state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
526         state.progressBars = make(map[string]struct{})
527         state.started = started
528         state.xxOnly = xxOnly
529
530         var buf []byte
531         var payload []byte
532         logMsg := func(les LEs) string {
533                 return fmt.Sprintf(
534                         "SP nice %s: waiting for first message",
535                         NicenessFmt(state.Nice),
536                 )
537         }
538         les := LEs{{"Nice", int(state.Nice)}}
539         state.Ctx.LogD("sp-startR", les, logMsg)
540         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
541         if buf, err = state.ReadSP(conn); err != nil {
542                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
543                 return err
544         }
545         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
546                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
547                 return err
548         }
549
550         var node *Node
551         for _, n := range state.Ctx.Neigh {
552                 if n.NoisePub == nil {
553                         continue
554                 }
555                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
556                         node = n
557                         break
558                 }
559         }
560         if node == nil {
561                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
562                 err = errors.New("unknown peer: " + peerId)
563                 state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg)
564                 return err
565         }
566         state.Node = node
567         state.rxRate = node.RxRate
568         state.txRate = node.TxRate
569         state.onlineDeadline = node.OnlineDeadline
570         state.maxOnlineTime = node.MaxOnlineTime
571         les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
572
573         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
574                 return err
575         }
576         var rxLock *os.File
577         if xxOnly == "" || xxOnly == TRx {
578                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
579                 if err != nil {
580                         return err
581                 }
582         }
583         state.rxLock = rxLock
584         var txLock *os.File
585         if xxOnly == "" || xxOnly == TTx {
586                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
587                 if err != nil {
588                         return err
589                 }
590         }
591         state.txLock = txLock
592
593         var infosPayloads [][]byte
594         if xxOnly == "" || xxOnly == TTx {
595                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
596         }
597         var firstPayload []byte
598         if len(infosPayloads) > 0 {
599                 firstPayload = infosPayloads[0]
600         }
601         // Pad first payload, to hide actual number of existing files
602         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
603                 firstPayload = append(firstPayload, SPHaltMarshalized...)
604         }
605
606         state.Ctx.LogD("sp-startR-write", les, func(les LEs) string {
607                 return fmt.Sprintf(
608                         "SP with %s (nice %s): sending first message",
609                         node.Name, NicenessFmt(state.Nice),
610                 )
611         })
612         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
613         if err != nil {
614                 state.dirUnlock()
615                 return err
616         }
617         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
618         if err = state.WriteSP(conn, buf, false); err != nil {
619                 state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
620                         return fmt.Sprintf(
621                                 "SP with %s (nice %s): writing",
622                                 node.Name, NicenessFmt(state.Nice),
623                         )
624                 })
625                 state.dirUnlock()
626                 return err
627         }
628         state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string {
629                 return fmt.Sprintf(
630                         "SP with %s (nice %s): starting workers",
631                         node.Name, NicenessFmt(state.Nice),
632                 )
633         })
634         err = state.StartWorkers(conn, infosPayloads, payload)
635         if err != nil {
636                 state.dirUnlock()
637         }
638         return err
639 }
640
641 func (state *SPState) closeFd(pth string) {
642         state.fdsLock.Lock()
643         if s, exists := state.fds[pth]; exists {
644                 delete(state.fds, pth)
645                 s.fd.Close()
646         }
647         state.fdsLock.Unlock()
648 }
649
650 func (state *SPState) StartWorkers(
651         conn ConnDeadlined,
652         infosPayloads [][]byte,
653         payload []byte,
654 ) error {
655         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
656         state.fds = make(map[string]FdAndFullSize)
657         state.fileHashers = make(map[string]*MTHAndOffset)
658         state.isDead = make(chan struct{})
659         if state.maxOnlineTime > 0 {
660                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
661         }
662         if !state.NoCK {
663                 spCheckerOnce.Do(func() { go SPChecker(state.Ctx) })
664                 go func() {
665                         for job := range state.Ctx.JobsNoCK(state.Node.Id) {
666                                 if job.PktEnc.Nice <= state.Nice {
667                                         spCheckerTasks <- SPCheckerTask{
668                                                 nodeId: state.Node.Id,
669                                                 hsh:    job.HshValue,
670                                                 done:   state.payloads,
671                                         }
672                                 }
673                         }
674                 }()
675         }
676
677         // Remaining handshake payload sending
678         if len(infosPayloads) > 1 {
679                 state.wg.Add(1)
680                 go func() {
681                         for _, payload := range infosPayloads[1:] {
682                                 state.Ctx.LogD(
683                                         "sp-queue-remaining",
684                                         append(les, LE{"Size", int64(len(payload))}),
685                                         func(les LEs) string {
686                                                 return fmt.Sprintf(
687                                                         "SP with %s (nice %s): queuing remaining payload (%s)",
688                                                         state.Node.Name, NicenessFmt(state.Nice),
689                                                         humanize.IBytes(uint64(len(payload))),
690                                                 )
691                                         },
692                                 )
693                                 state.payloads <- payload
694                         }
695                         state.wg.Done()
696                 }()
697         }
698
699         // Processing of first payload and queueing its responses
700         logMsg := func(les LEs) string {
701                 return fmt.Sprintf(
702                         "SP with %s (nice %s): processing first payload (%s)",
703                         state.Node.Name, NicenessFmt(state.Nice),
704                         humanize.IBytes(uint64(len(payload))),
705                 )
706         }
707         state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
708         replies, err := state.ProcessSP(payload)
709         if err != nil {
710                 state.Ctx.LogE("sp-process", les, err, logMsg)
711                 return err
712         }
713         state.wg.Add(1)
714         go func() {
715                 for _, reply := range replies {
716                         state.Ctx.LogD(
717                                 "sp-queue-reply",
718                                 append(les, LE{"Size", int64(len(reply))}),
719                                 func(les LEs) string {
720                                         return fmt.Sprintf(
721                                                 "SP with %s (nice %s): queuing reply (%s)",
722                                                 state.Node.Name, NicenessFmt(state.Nice),
723                                                 humanize.IBytes(uint64(len(payload))),
724                                         )
725                                 },
726                         )
727                         state.payloads <- reply
728                 }
729                 state.wg.Done()
730         }()
731
732         // Periodic jobs
733         state.wg.Add(1)
734         go func() {
735                 deadlineTicker := time.NewTicker(time.Second)
736                 pingTicker := time.NewTicker(PingTimeout)
737                 for {
738                         select {
739                         case <-state.isDead:
740                                 state.wg.Done()
741                                 deadlineTicker.Stop()
742                                 pingTicker.Stop()
743                                 return
744                         case now := <-deadlineTicker.C:
745                                 if now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
746                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline {
747                                         goto Deadlined
748                                 }
749                                 if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) {
750                                         goto Deadlined
751                                 }
752                                 if now.Sub(state.RxLastSeen) >= 2*PingTimeout {
753                                         goto Deadlined
754                                 }
755                                 break
756                         Deadlined:
757                                 state.SetDead()
758                                 conn.Close()
759                         case now := <-pingTicker.C:
760                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
761                                         state.wg.Add(1)
762                                         go func() {
763                                                 state.pings <- struct{}{}
764                                                 state.wg.Done()
765                                         }()
766                                 }
767                         }
768                 }
769         }()
770
771         // Spool checker and INFOs sender of appearing files
772         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
773                 state.wg.Add(1)
774                 go func() {
775                         ticker := time.NewTicker(time.Second)
776                         for {
777                                 select {
778                                 case <-state.isDead:
779                                         state.wg.Done()
780                                         ticker.Stop()
781                                         return
782                                 case <-ticker.C:
783                                         for _, payload := range state.Ctx.infosOur(
784                                                 state.Node.Id,
785                                                 state.Nice,
786                                                 &state.infosOurSeen,
787                                         ) {
788                                                 state.Ctx.LogD(
789                                                         "sp-queue-info",
790                                                         append(les, LE{"Size", int64(len(payload))}),
791                                                         func(les LEs) string {
792                                                                 return fmt.Sprintf(
793                                                                         "SP with %s (nice %s): queuing new info (%s)",
794                                                                         state.Node.Name, NicenessFmt(state.Nice),
795                                                                         humanize.IBytes(uint64(len(payload))),
796                                                                 )
797                                                         },
798                                                 )
799                                                 state.payloads <- payload
800                                         }
801                                 }
802                         }
803                 }()
804         }
805
806         // Sender
807         state.wg.Add(1)
808         go func() {
809                 defer conn.Close()
810                 defer state.SetDead()
811                 defer state.wg.Done()
812                 buf := make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
813                 for {
814                         if state.NotAlive() {
815                                 return
816                         }
817                         var payload []byte
818                         var ping bool
819                         select {
820                         case <-state.pings:
821                                 state.Ctx.LogD("sp-got-ping", les, func(les LEs) string {
822                                         return fmt.Sprintf(
823                                                 "SP with %s (nice %s): got ping",
824                                                 state.Node.Name, NicenessFmt(state.Nice),
825                                         )
826                                 })
827                                 payload = SPPingMarshalized
828                                 ping = true
829                         case payload = <-state.payloads:
830                                 state.Ctx.LogD(
831                                         "sp-got-payload",
832                                         append(les, LE{"Size", int64(len(payload))}),
833                                         func(les LEs) string {
834                                                 return fmt.Sprintf(
835                                                         "SP with %s (nice %s): got payload (%s)",
836                                                         state.Node.Name, NicenessFmt(state.Nice),
837                                                         humanize.IBytes(uint64(len(payload))),
838                                                 )
839                                         },
840                                 )
841                         default:
842                                 state.RLock()
843                                 if len(state.queueTheir) == 0 {
844                                         state.RUnlock()
845                                         time.Sleep(100 * time.Millisecond)
846                                         continue
847                                 }
848                                 freq := state.queueTheir[0].freq
849                                 state.RUnlock()
850                                 if state.txRate > 0 {
851                                         time.Sleep(time.Second / time.Duration(state.txRate))
852                                 }
853                                 pktName := Base32Codec.EncodeToString(freq.Hash[:])
854                                 lesp := append(
855                                         les,
856                                         LE{"XX", string(TTx)},
857                                         LE{"Pkt", pktName},
858                                         LE{"Size", int64(freq.Offset)},
859                                 )
860                                 logMsg := func(les LEs) string {
861                                         return fmt.Sprintf(
862                                                 "SP with %s (nice %s): tx/%s (%s)",
863                                                 state.Node.Name, NicenessFmt(state.Nice),
864                                                 pktName,
865                                                 humanize.IBytes(freq.Offset),
866                                         )
867                                 }
868                                 state.Ctx.LogD("sp-queue", lesp, func(les LEs) string {
869                                         return logMsg(les) + ": queueing"
870                                 })
871                                 pth := filepath.Join(
872                                         state.Ctx.Spool,
873                                         state.Node.Id.String(),
874                                         string(TTx),
875                                         Base32Codec.EncodeToString(freq.Hash[:]),
876                                 )
877                                 state.fdsLock.RLock()
878                                 fdAndFullSize, exists := state.fds[pth]
879                                 state.fdsLock.RUnlock()
880                                 if !exists {
881                                         state.Ctx.LogD("sp-queue-open", lesp, func(les LEs) string {
882                                                 return logMsg(les) + ": opening"
883                                         })
884                                         fd, err := os.Open(pth)
885                                         if err != nil {
886                                                 state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
887                                                         return logMsg(les) + ": opening"
888                                                 })
889                                                 return
890                                         }
891                                         fi, err := fd.Stat()
892                                         if err != nil {
893                                                 state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string {
894                                                         return logMsg(les) + ": stating"
895                                                 })
896                                                 return
897                                         }
898                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
899                                         state.fdsLock.Lock()
900                                         state.fds[pth] = fdAndFullSize
901                                         state.fdsLock.Unlock()
902                                 }
903                                 fd := fdAndFullSize.fd
904                                 fullSize := fdAndFullSize.fullSize
905                                 lesp = append(lesp, LE{"FullSize", fullSize})
906                                 var bufRead []byte
907                                 if freq.Offset < uint64(fullSize) {
908                                         state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
909                                                 return logMsg(les) + ": seeking"
910                                         })
911                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
912                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
913                                                         return logMsg(les) + ": seeking"
914                                                 })
915                                                 return
916                                         }
917                                         n, err := fd.Read(buf)
918                                         if err != nil {
919                                                 state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
920                                                         return logMsg(les) + ": reading"
921                                                 })
922                                                 return
923                                         }
924                                         bufRead = buf[:n]
925                                         lesp = append(
926                                                 les,
927                                                 LE{"XX", string(TTx)},
928                                                 LE{"Pkt", pktName},
929                                                 LE{"Size", int64(n)},
930                                                 LE{"FullSize", fullSize},
931                                         )
932                                         state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
933                                                 return fmt.Sprintf(
934                                                         "%s: read %s",
935                                                         logMsg(les), humanize.IBytes(uint64(n)),
936                                                 )
937                                         })
938                                 } else {
939                                         state.closeFd(pth)
940                                 }
941                                 payload = MarshalSP(SPTypeFile, SPFile{
942                                         Hash:    freq.Hash,
943                                         Offset:  freq.Offset,
944                                         Payload: bufRead,
945                                 })
946                                 ourSize := freq.Offset + uint64(len(bufRead))
947                                 lesp = append(
948                                         les,
949                                         LE{"XX", string(TTx)},
950                                         LE{"Pkt", pktName},
951                                         LE{"Size", int64(ourSize)},
952                                         LE{"FullSize", fullSize},
953                                 )
954                                 if state.Ctx.ShowPrgrs {
955                                         state.progressBars[pktName] = struct{}{}
956                                         Progress("Tx", lesp)
957                                 }
958                                 if ourSize == uint64(fullSize) {
959                                         state.closeFd(pth)
960                                         state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
961                                                 return logMsg(les) + ": finished"
962                                         })
963                                         if state.Ctx.ShowPrgrs {
964                                                 delete(state.progressBars, pktName)
965                                         }
966                                 }
967                                 state.Lock()
968                                 for i, q := range state.queueTheir {
969                                         if *q.freq.Hash != *freq.Hash {
970                                                 continue
971                                         }
972                                         if ourSize == uint64(fullSize) {
973                                                 state.queueTheir = append(
974                                                         state.queueTheir[:i],
975                                                         state.queueTheir[i+1:]...,
976                                                 )
977                                         } else {
978                                                 q.freq.Offset = ourSize
979                                         }
980                                         break
981                                 }
982                                 state.Unlock()
983                         }
984                         logMsg := func(les LEs) string {
985                                 return fmt.Sprintf(
986                                         "SP with %s (nice %s): sending %s",
987                                         state.Node.Name, NicenessFmt(state.Nice),
988                                         humanize.IBytes(uint64(len(payload))),
989                                 )
990                         }
991                         state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
992                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
993                         ct, err := state.csOur.Encrypt(nil, nil, payload)
994                         if err != nil {
995                                 state.Ctx.LogE("sp-encrypting", les, err, logMsg)
996                                 return
997                         }
998                         if err := state.WriteSP(conn, ct, ping); err != nil {
999                                 state.Ctx.LogE("sp-sending", les, err, logMsg)
1000                                 return
1001                         }
1002                 }
1003         }()
1004
1005         // Receiver
1006         state.wg.Add(1)
1007         go func() {
1008                 for {
1009                         if state.NotAlive() {
1010                                 break
1011                         }
1012                         logMsg := func(les LEs) string {
1013                                 return fmt.Sprintf(
1014                                         "SP with %s (nice %s): waiting for payload",
1015                                         state.Node.Name, NicenessFmt(state.Nice),
1016                                 )
1017                         }
1018                         state.Ctx.LogD("sp-recv-wait", les, logMsg)
1019                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
1020                         payload, err := state.ReadSP(conn)
1021                         if err != nil {
1022                                 if err == io.EOF {
1023                                         break
1024                                 }
1025                                 unmarshalErr := err.(*xdr.UnmarshalError)
1026                                 if os.IsTimeout(unmarshalErr.Err) {
1027                                         continue
1028                                 }
1029                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
1030                                         break
1031                                 }
1032                                 state.Ctx.LogE("sp-recv-wait", les, err, logMsg)
1033                                 break
1034                         }
1035                         logMsg = func(les LEs) string {
1036                                 return fmt.Sprintf(
1037                                         "SP with %s (nice %s): payload (%s)",
1038                                         state.Node.Name, NicenessFmt(state.Nice),
1039                                         humanize.IBytes(uint64(len(payload))),
1040                                 )
1041                         }
1042                         state.Ctx.LogD(
1043                                 "sp-recv-got",
1044                                 append(les, LE{"Size", int64(len(payload))}),
1045                                 func(les LEs) string { return logMsg(les) + ": got" },
1046                         )
1047                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
1048                         if err != nil {
1049                                 state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string {
1050                                         return logMsg(les) + ": got"
1051                                 })
1052                                 break
1053                         }
1054                         state.Ctx.LogD(
1055                                 "sp-recv-process",
1056                                 append(les, LE{"Size", int64(len(payload))}),
1057                                 func(les LEs) string {
1058                                         return logMsg(les) + ": processing"
1059                                 },
1060                         )
1061                         replies, err := state.ProcessSP(payload)
1062                         if err != nil {
1063                                 state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string {
1064                                         return logMsg(les) + ": processing"
1065                                 })
1066                                 break
1067                         }
1068                         state.wg.Add(1)
1069                         go func() {
1070                                 for _, reply := range replies {
1071                                         state.Ctx.LogD(
1072                                                 "sp-recv-reply",
1073                                                 append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
1074                                                 func(les LEs) string {
1075                                                         return fmt.Sprintf(
1076                                                                 "SP with %s (nice %s): queuing reply (%s)",
1077                                                                 state.Node.Name, NicenessFmt(state.Nice),
1078                                                                 humanize.IBytes(uint64(len(reply))),
1079                                                         )
1080                                                 },
1081                                         )
1082                                         state.payloads <- reply
1083                                 }
1084                                 state.wg.Done()
1085                         }()
1086                         if state.rxRate > 0 {
1087                                 time.Sleep(time.Second / time.Duration(state.rxRate))
1088                         }
1089                 }
1090                 state.SetDead()
1091                 state.wg.Done()
1092                 state.SetDead()
1093                 conn.Close()
1094         }()
1095
1096         return nil
1097 }
1098
1099 func (state *SPState) Wait() {
1100         state.wg.Wait()
1101         close(state.payloads)
1102         close(state.pings)
1103         state.Duration = time.Now().Sub(state.started)
1104         state.dirUnlock()
1105         state.RxSpeed = state.RxBytes
1106         state.TxSpeed = state.TxBytes
1107         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
1108         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
1109         if rxDuration > 0 {
1110                 state.RxSpeed = state.RxBytes / rxDuration
1111         }
1112         if txDuration > 0 {
1113                 state.TxSpeed = state.TxBytes / txDuration
1114         }
1115         for _, s := range state.fds {
1116                 s.fd.Close()
1117         }
1118         for pktName := range state.progressBars {
1119                 ProgressKill(pktName)
1120         }
1121 }
1122
1123 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
1124         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
1125         r := bytes.NewReader(payload)
1126         var err error
1127         var replies [][]byte
1128         var infosGot bool
1129         for r.Len() > 0 {
1130                 state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string {
1131                         return fmt.Sprintf(
1132                                 "SP with %s (nice %s): unmarshaling header",
1133                                 state.Node.Name, NicenessFmt(state.Nice),
1134                         )
1135                 })
1136                 var head SPHead
1137                 if _, err = xdr.Unmarshal(r, &head); err != nil {
1138                         state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string {
1139                                 return fmt.Sprintf(
1140                                         "SP with %s (nice %s): unmarshaling header",
1141                                         state.Node.Name, NicenessFmt(state.Nice),
1142                                 )
1143                         })
1144                         return nil, err
1145                 }
1146                 if head.Type != SPTypePing {
1147                         state.RxLastNonPing = state.RxLastSeen
1148                 }
1149                 switch head.Type {
1150                 case SPTypeHalt:
1151                         state.Ctx.LogD(
1152                                 "sp-process-halt",
1153                                 append(les, LE{"Type", "halt"}), func(les LEs) string {
1154                                         return fmt.Sprintf(
1155                                                 "SP with %s (nice %s): got HALT",
1156                                                 state.Node.Name, NicenessFmt(state.Nice),
1157                                         )
1158                                 },
1159                         )
1160                         state.Lock()
1161                         state.queueTheir = nil
1162                         state.Unlock()
1163
1164                 case SPTypePing:
1165                         state.Ctx.LogD(
1166                                 "sp-process-ping",
1167                                 append(les, LE{"Type", "ping"}),
1168                                 func(les LEs) string {
1169                                         return fmt.Sprintf(
1170                                                 "SP with %s (nice %s): got PING",
1171                                                 state.Node.Name, NicenessFmt(state.Nice),
1172                                         )
1173                                 },
1174                         )
1175
1176                 case SPTypeInfo:
1177                         infosGot = true
1178                         lesp := append(les, LE{"Type", "info"})
1179                         state.Ctx.LogD(
1180                                 "sp-process-info-unmarshal", lesp,
1181                                 func(les LEs) string {
1182                                         return fmt.Sprintf(
1183                                                 "SP with %s (nice %s): unmarshaling INFO",
1184                                                 state.Node.Name, NicenessFmt(state.Nice),
1185                                         )
1186                                 },
1187                         )
1188                         var info SPInfo
1189                         if _, err = xdr.Unmarshal(r, &info); err != nil {
1190                                 state.Ctx.LogE(
1191                                         "sp-process-info-unmarshal", lesp, err,
1192                                         func(les LEs) string {
1193                                                 return fmt.Sprintf(
1194                                                         "SP with %s (nice %s): unmarshaling INFO",
1195                                                         state.Node.Name, NicenessFmt(state.Nice),
1196                                                 )
1197                                         },
1198                                 )
1199                                 return nil, err
1200                         }
1201                         pktName := Base32Codec.EncodeToString(info.Hash[:])
1202                         lesp = append(
1203                                 lesp,
1204                                 LE{"Pkt", pktName},
1205                                 LE{"Size", int64(info.Size)},
1206                                 LE{"PktNice", int(info.Nice)},
1207                         )
1208                         logMsg := func(les LEs) string {
1209                                 return fmt.Sprintf(
1210                                         "SP with %s (nice %s): INFO %s (%s) nice %s",
1211                                         state.Node.Name, NicenessFmt(state.Nice),
1212                                         pktName,
1213                                         humanize.IBytes(info.Size),
1214                                         NicenessFmt(info.Nice),
1215                                 )
1216                         }
1217                         if !state.listOnly && info.Nice > state.Nice {
1218                                 state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string {
1219                                         return logMsg(les) + ": too nice"
1220                                 })
1221                                 continue
1222                         }
1223                         state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string {
1224                                 return logMsg(les) + ": received"
1225                         })
1226                         if !state.listOnly && state.xxOnly == TTx {
1227                                 continue
1228                         }
1229                         state.Lock()
1230                         state.infosTheir[*info.Hash] = &info
1231                         state.Unlock()
1232                         state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string {
1233                                 return logMsg(les) + ": stating part"
1234                         })
1235                         pktPath := filepath.Join(
1236                                 state.Ctx.Spool,
1237                                 state.Node.Id.String(),
1238                                 string(TRx),
1239                                 Base32Codec.EncodeToString(info.Hash[:]),
1240                         )
1241                         logMsg = func(les LEs) string {
1242                                 return fmt.Sprintf(
1243                                         "Packet %s (%s) (nice %s)",
1244                                         pktName,
1245                                         humanize.IBytes(info.Size),
1246                                         NicenessFmt(info.Nice),
1247                                 )
1248                         }
1249                         if _, err = os.Stat(pktPath); err == nil {
1250                                 state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string {
1251                                         return logMsg(les) + ": already done"
1252                                 })
1253                                 if !state.listOnly {
1254                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1255                                 }
1256                                 continue
1257                         }
1258                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
1259                                 state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
1260                                         return logMsg(les) + ": already seen"
1261                                 })
1262                                 if !state.listOnly {
1263                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1264                                 }
1265                                 continue
1266                         }
1267                         if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
1268                                 state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string {
1269                                         return logMsg(les) + ": still not checksummed"
1270                                 })
1271                                 continue
1272                         }
1273                         fi, err := os.Stat(pktPath + PartSuffix)
1274                         var offset int64
1275                         if err == nil {
1276                                 offset = fi.Size()
1277                         }
1278                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
1279                                 state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string {
1280                                         return logMsg(les) + ": not enough space"
1281                                 })
1282                                 continue
1283                         }
1284                         state.Ctx.LogI(
1285                                 "sp-info",
1286                                 append(lesp, LE{"Offset", offset}),
1287                                 func(les LEs) string {
1288                                         return fmt.Sprintf(
1289                                                 "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size,
1290                                         )
1291                                 },
1292                         )
1293                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1294                                 replies = append(replies, MarshalSP(
1295                                         SPTypeFreq,
1296                                         SPFreq{info.Hash, uint64(offset)},
1297                                 ))
1298                         }
1299
1300                 case SPTypeFile:
1301                         lesp := append(les, LE{"Type", "file"})
1302                         state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string {
1303                                 return fmt.Sprintf(
1304                                         "SP with %s (nice %s): unmarshaling FILE",
1305                                         state.Node.Name, NicenessFmt(state.Nice),
1306                                 )
1307                         })
1308                         var file SPFile
1309                         if _, err = xdr.Unmarshal(r, &file); err != nil {
1310                                 state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string {
1311                                         return fmt.Sprintf(
1312                                                 "SP with %s (nice %s): unmarshaling FILE",
1313                                                 state.Node.Name, NicenessFmt(state.Nice),
1314                                         )
1315                                 })
1316                                 return nil, err
1317                         }
1318                         pktName := Base32Codec.EncodeToString(file.Hash[:])
1319                         lesp = append(
1320                                 lesp,
1321                                 LE{"XX", string(TRx)},
1322                                 LE{"Pkt", pktName},
1323                                 LE{"Size", int64(len(file.Payload))},
1324                         )
1325                         logMsg := func(les LEs) string {
1326                                 return fmt.Sprintf(
1327                                         "Got packet %s (%s)",
1328                                         pktName, humanize.IBytes(uint64(len(file.Payload))),
1329                                 )
1330                         }
1331                         fullsize := int64(0)
1332                         state.RLock()
1333                         infoTheir := state.infosTheir[*file.Hash]
1334                         state.RUnlock()
1335                         if infoTheir == nil {
1336                                 state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1337                                         return logMsg(les) + ": unknown file"
1338                                 })
1339                                 continue
1340                         }
1341                         fullsize = int64(infoTheir.Size)
1342                         lesp = append(lesp, LE{"FullSize", fullsize})
1343                         dirToSync := filepath.Join(
1344                                 state.Ctx.Spool,
1345                                 state.Node.Id.String(),
1346                                 string(TRx),
1347                         )
1348                         filePath := filepath.Join(dirToSync, pktName)
1349                         filePathPart := filePath + PartSuffix
1350                         state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
1351                                 return logMsg(les) + ": opening part"
1352                         })
1353                         state.fdsLock.RLock()
1354                         fdAndFullSize, exists := state.fds[filePathPart]
1355                         state.fdsLock.RUnlock()
1356                         hasherAndOffset := state.fileHashers[filePath]
1357                         var fd *os.File
1358                         if exists {
1359                                 fd = fdAndFullSize.fd
1360                         } else {
1361                                 fd, err = os.OpenFile(
1362                                         filePathPart,
1363                                         os.O_RDWR|os.O_CREATE,
1364                                         os.FileMode(0666),
1365                                 )
1366                                 if err != nil {
1367                                         state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1368                                                 return logMsg(les) + ": opening part"
1369                                         })
1370                                         return nil, err
1371                                 }
1372                                 state.fdsLock.Lock()
1373                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1374                                 state.fdsLock.Unlock()
1375                                 if !state.NoCK {
1376                                         hasherAndOffset = &MTHAndOffset{
1377                                                 mth:    MTHNew(fullsize, int64(file.Offset)),
1378                                                 offset: file.Offset,
1379                                         }
1380                                         state.fileHashers[filePath] = hasherAndOffset
1381                                 }
1382                         }
1383                         state.Ctx.LogD(
1384                                 "sp-file-seek",
1385                                 append(lesp, LE{"Offset", file.Offset}),
1386                                 func(les LEs) string {
1387                                         return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset)
1388                                 })
1389                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1390                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1391                                         return logMsg(les) + ": seeking"
1392                                 })
1393                                 state.closeFd(filePathPart)
1394                                 return nil, err
1395                         }
1396                         state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string {
1397                                 return logMsg(les) + ": writing"
1398                         })
1399                         if _, err = fd.Write(file.Payload); err != nil {
1400                                 state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string {
1401                                         return logMsg(les) + ": writing"
1402                                 })
1403                                 state.closeFd(filePathPart)
1404                                 return nil, err
1405                         }
1406                         if hasherAndOffset != nil {
1407                                 if hasherAndOffset.offset == file.Offset {
1408                                         if _, err = hasherAndOffset.mth.Write(file.Payload); err != nil {
1409                                                 panic(err)
1410                                         }
1411                                         hasherAndOffset.offset += uint64(len(file.Payload))
1412                                 } else {
1413                                         state.Ctx.LogE(
1414                                                 "sp-file-offset-differs", lesp, errors.New("offset differs"),
1415                                                 func(les LEs) string {
1416                                                         return logMsg(les) + ": deleting hasher"
1417                                                 },
1418                                         )
1419                                         delete(state.fileHashers, filePath)
1420                                         hasherAndOffset = nil
1421                                 }
1422                         }
1423                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1424                         lesp[len(lesp)-2].V = ourSize
1425                         if state.Ctx.ShowPrgrs {
1426                                 state.progressBars[pktName] = struct{}{}
1427                                 Progress("Rx", lesp)
1428                         }
1429                         if fullsize != ourSize {
1430                                 continue
1431                         }
1432                         if state.Ctx.ShowPrgrs {
1433                                 delete(state.progressBars, pktName)
1434                         }
1435                         logMsg = func(les LEs) string {
1436                                 return fmt.Sprintf(
1437                                         "Got packet %s %d%% (%s / %s)",
1438                                         pktName, 100*ourSize/fullsize,
1439                                         humanize.IBytes(uint64(ourSize)),
1440                                         humanize.IBytes(uint64(fullsize)),
1441                                 )
1442                         }
1443                         err = fd.Sync()
1444                         if err != nil {
1445                                 state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
1446                                         return logMsg(les) + ": syncing"
1447                                 })
1448                                 state.closeFd(filePathPart)
1449                                 continue
1450                         }
1451                         if hasherAndOffset != nil {
1452                                 delete(state.fileHashers, filePath)
1453                                 if hasherAndOffset.mth.PreaddSize() == 0 {
1454                                         if bytes.Compare(hasherAndOffset.mth.Sum(nil), file.Hash[:]) != 0 {
1455                                                 state.Ctx.LogE(
1456                                                         "sp-file-bad-checksum", lesp,
1457                                                         errors.New("checksum mismatch"),
1458                                                         logMsg,
1459                                                 )
1460                                                 state.closeFd(filePathPart)
1461                                                 continue
1462                                         }
1463                                         if err = os.Rename(filePathPart, filePath); err != nil {
1464                                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1465                                                         return logMsg(les) + ": renaming"
1466                                                 })
1467                                                 state.closeFd(filePathPart)
1468                                                 continue
1469                                         }
1470                                         if err = DirSync(dirToSync); err != nil {
1471                                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1472                                                         return logMsg(les) + ": dirsyncing"
1473                                                 })
1474                                                 state.closeFd(filePathPart)
1475                                                 continue
1476                                         }
1477                                         state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
1478                                                 return logMsg(les) + ": done"
1479                                         })
1480                                         state.wg.Add(1)
1481                                         go func() {
1482                                                 state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1483                                                 state.wg.Done()
1484                                         }()
1485                                         state.Lock()
1486                                         delete(state.infosTheir, *file.Hash)
1487                                         state.Unlock()
1488                                         if !state.Ctx.HdrUsage {
1489                                                 continue
1490                                         }
1491                                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
1492                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1493                                                         return logMsg(les) + ": seeking"
1494                                                 })
1495                                                 state.closeFd(filePathPart)
1496                                                 continue
1497                                         }
1498                                         _, pktEncRaw, err := state.Ctx.HdrRead(fd)
1499                                         state.closeFd(filePathPart)
1500                                         if err != nil {
1501                                                 state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
1502                                                         return logMsg(les) + ": HdrReading"
1503                                                 })
1504                                                 continue
1505                                         }
1506                                         state.Ctx.HdrWrite(pktEncRaw, filePath)
1507                                         continue
1508                                 }
1509                         }
1510                         state.closeFd(filePathPart)
1511                         if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1512                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1513                                         return logMsg(les) + ": renaming"
1514                                 })
1515                                 continue
1516                         }
1517                         if err = DirSync(dirToSync); err != nil {
1518                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1519                                         return logMsg(les) + ": dirsyncing"
1520                                 })
1521                                 continue
1522                         }
1523                         state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string {
1524                                 return logMsg(les) + ": downloaded"
1525                         })
1526                         state.Lock()
1527                         delete(state.infosTheir, *file.Hash)
1528                         state.Unlock()
1529                         go func() {
1530                                 t := SPCheckerTask{
1531                                         nodeId: state.Node.Id,
1532                                         hsh:    file.Hash,
1533                                         done:   state.payloads,
1534                                 }
1535                                 if hasherAndOffset != nil {
1536                                         t.mth = hasherAndOffset.mth
1537                                 }
1538                                 spCheckerTasks <- t
1539                         }()
1540
1541                 case SPTypeDone:
1542                         lesp := append(les, LE{"Type", "done"})
1543                         state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string {
1544                                 return fmt.Sprintf(
1545                                         "SP with %s (nice %s): unmarshaling DONE",
1546                                         state.Node.Name, NicenessFmt(state.Nice),
1547                                 )
1548                         })
1549                         var done SPDone
1550                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1551                                 state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string {
1552                                         return fmt.Sprintf(
1553                                                 "SP with %s (nice %s): unmarshaling DONE",
1554                                                 state.Node.Name, NicenessFmt(state.Nice),
1555                                         )
1556                                 })
1557                                 return nil, err
1558                         }
1559                         pktName := Base32Codec.EncodeToString(done.Hash[:])
1560                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)})
1561                         logMsg := func(les LEs) string {
1562                                 return fmt.Sprintf(
1563                                         "SP with %s (nice %s): DONE: removing %s",
1564                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1565                                 )
1566                         }
1567                         state.Ctx.LogD("sp-done", lesp, logMsg)
1568                         pth := filepath.Join(
1569                                 state.Ctx.Spool,
1570                                 state.Node.Id.String(),
1571                                 string(TTx),
1572                                 pktName,
1573                         )
1574                         if err = os.Remove(pth); err == nil {
1575                                 state.Ctx.LogI("sp-done", lesp, func(les LEs) string {
1576                                         return fmt.Sprintf("Packet %s is sent", pktName)
1577                                 })
1578                                 if state.Ctx.HdrUsage {
1579                                         os.Remove(pth + HdrSuffix)
1580                                 }
1581                         } else {
1582                                 state.Ctx.LogE("sp-done", lesp, err, logMsg)
1583                         }
1584
1585                 case SPTypeFreq:
1586                         lesp := append(les, LE{"Type", "freq"})
1587                         state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string {
1588                                 return fmt.Sprintf(
1589                                         "SP with %s (nice %s): unmarshaling FREQ",
1590                                         state.Node.Name, NicenessFmt(state.Nice),
1591                                 )
1592                         })
1593                         var freq SPFreq
1594                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1595                                 state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string {
1596                                         return fmt.Sprintf(
1597                                                 "SP with %s (nice %s): unmarshaling FREQ",
1598                                                 state.Node.Name, NicenessFmt(state.Nice),
1599                                         )
1600                                 })
1601                                 return nil, err
1602                         }
1603                         pktName := Base32Codec.EncodeToString(freq.Hash[:])
1604                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset})
1605                         state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string {
1606                                 return fmt.Sprintf(
1607                                         "SP with %s (nice %s): FREQ %s: queuing",
1608                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1609                                 )
1610                         })
1611                         nice, exists := state.infosOurSeen[*freq.Hash]
1612                         if exists {
1613                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1614                                         state.Lock()
1615                                         insertIdx := 0
1616                                         var freqWithNice *FreqWithNice
1617                                         for insertIdx, freqWithNice = range state.queueTheir {
1618                                                 if freqWithNice.nice > nice {
1619                                                         break
1620                                                 }
1621                                         }
1622                                         state.queueTheir = append(state.queueTheir, nil)
1623                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1624                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1625                                         state.Unlock()
1626                                 } else {
1627                                         state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string {
1628                                                 return fmt.Sprintf(
1629                                                         "SP with %s (nice %s): FREQ %s: skipping",
1630                                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1631                                                 )
1632                                         })
1633                                 }
1634                         } else {
1635                                 state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string {
1636                                         return fmt.Sprintf(
1637                                                 "SP with %s (nice %s): FREQ %s: unknown",
1638                                                 state.Node.Name, NicenessFmt(state.Nice), pktName,
1639                                         )
1640                                 })
1641                         }
1642
1643                 default:
1644                         state.Ctx.LogE(
1645                                 "sp-process-type-unknown",
1646                                 append(les, LE{"Type", head.Type}),
1647                                 errors.New("unknown type"),
1648                                 func(les LEs) string {
1649                                         return fmt.Sprintf(
1650                                                 "SP with %s (nice %s): %d",
1651                                                 state.Node.Name, NicenessFmt(state.Nice), head.Type,
1652                                         )
1653                                 },
1654                         )
1655                         return nil, BadPktType
1656                 }
1657         }
1658
1659         if infosGot {
1660                 var pkts int
1661                 var size uint64
1662                 state.RLock()
1663                 for _, info := range state.infosTheir {
1664                         pkts++
1665                         size += info.Size
1666                 }
1667                 state.RUnlock()
1668                 state.Ctx.LogI("sp-infos-rx", LEs{
1669                         {"XX", string(TRx)},
1670                         {"Node", state.Node.Id},
1671                         {"Pkts", pkts},
1672                         {"Size", int64(size)},
1673                 }, func(les LEs) string {
1674                         return fmt.Sprintf(
1675                                 "%s has got for us: %d packets, %s",
1676                                 state.Node.Name, pkts, humanize.IBytes(size),
1677                         )
1678                 })
1679         }
1680         return payloadsSplit(replies), nil
1681 }
1682
1683 func SPChecker(ctx *Ctx) {
1684         for t := range spCheckerTasks {
1685                 pktName := Base32Codec.EncodeToString(t.hsh[:])
1686                 les := LEs{
1687                         {"XX", string(TRx)},
1688                         {"Node", t.nodeId},
1689                         {"Pkt", pktName},
1690                 }
1691                 SPCheckerWg.Add(1)
1692                 ctx.LogD("sp-checker", les, func(les LEs) string {
1693                         return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(t.nodeId), pktName)
1694                 })
1695                 size, err := ctx.CheckNoCK(t.nodeId, t.hsh, t.mth)
1696                 les = append(les, LE{"Size", size})
1697                 if err != nil {
1698                         ctx.LogE("sp-checker", les, err, func(les LEs) string {
1699                                 return fmt.Sprintf(
1700                                         "Checksumming %s/rx/%s (%s)", ctx.NodeName(t.nodeId), pktName,
1701                                         humanize.IBytes(uint64(size)),
1702                                 )
1703                         })
1704                         SPCheckerWg.Done()
1705                         continue
1706                 }
1707                 ctx.LogI("sp-checker-done", les, func(les LEs) string {
1708                         return fmt.Sprintf(
1709                                 "Packet %s is retreived (%s)",
1710                                 pktName, humanize.IBytes(uint64(size)),
1711                         )
1712                 })
1713                 SPCheckerWg.Done()
1714                 go func(t SPCheckerTask) {
1715                         defer func() { recover() }()
1716                         t.done <- MarshalSP(SPTypeDone, SPDone{t.hsh})
1717                 }(t)
1718         }
1719 }