]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
Note about buildability with 1.22
[nncp.git] / src / sp.go
1 // NNCP -- Node to Node copy, utilities for store-and-forward data exchange
2 // Copyright (C) 2016-2024 Sergey Matveev <stargrave@stargrave.org>
3 //
4 // This program is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, version 3 of the License.
7 //
8 // This program is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 // GNU General Public License for more details.
12 //
13 // You should have received a copy of the GNU General Public License
14 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
16 package nncp
17
18 import (
19         "bytes"
20         "crypto/subtle"
21         "errors"
22         "fmt"
23         "io"
24         "log"
25         "os"
26         "path/filepath"
27         "sort"
28         "strconv"
29         "sync"
30         "time"
31
32         xdr "github.com/davecgh/go-xdr/xdr2"
33         "github.com/dustin/go-humanize"
34         "github.com/flynn/noise"
35 )
36
37 const (
38         MaxSPSize      = 1<<16 - 256
39         PartSuffix     = ".part"
40         SPHeadOverhead = 4
41         CfgDeadline    = "NNCPDEADLINE"
42 )
43
44 type MTHAndOffset struct {
45         mth    MTH
46         offset uint64
47 }
48
49 type SPCheckerTask struct {
50         nodeId *NodeId
51         hsh    *[MTHSize]byte
52         mth    MTH
53         done   chan []byte
54 }
55
56 var (
57         SPInfoOverhead    int
58         SPFreqOverhead    int
59         SPFileOverhead    int
60         SPHaltMarshalized []byte
61         SPPingMarshalized []byte
62
63         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
64                 noise.DH25519,
65                 noise.CipherChaChaPoly,
66                 noise.HashBLAKE2b,
67         )
68
69         DefaultDeadline = 10 * time.Second
70         PingTimeout     = time.Minute
71
72         spCheckerTasks chan SPCheckerTask
73         SPCheckerWg    sync.WaitGroup
74         spCheckerOnce  sync.Once
75 )
76
77 type FdAndFullSize struct {
78         fd       *os.File
79         fullSize int64
80 }
81
82 type SPType uint8
83
84 const (
85         SPTypeInfo SPType = iota
86         SPTypeFreq SPType = iota
87         SPTypeFile SPType = iota
88         SPTypeDone SPType = iota
89         SPTypeHalt SPType = iota
90         SPTypePing SPType = iota
91 )
92
93 type SPHead struct {
94         Type SPType
95 }
96
97 type SPInfo struct {
98         Nice uint8
99         Size uint64
100         Hash *[MTHSize]byte
101 }
102
103 type SPFreq struct {
104         Hash   *[MTHSize]byte
105         Offset uint64
106 }
107
108 type SPFile struct {
109         Hash    *[MTHSize]byte
110         Offset  uint64
111         Payload []byte
112 }
113
114 type SPDone struct {
115         Hash *[MTHSize]byte
116 }
117
118 type SPRaw struct {
119         Magic   [8]byte
120         Payload []byte
121 }
122
123 type FreqWithNice struct {
124         freq *SPFreq
125         nice uint8
126 }
127
128 type ConnDeadlined interface {
129         io.ReadWriteCloser
130         SetReadDeadline(t time.Time) error
131         SetWriteDeadline(t time.Time) error
132 }
133
134 func init() {
135         if v := os.Getenv(CfgDeadline); v != "" {
136                 i, err := strconv.Atoi(v)
137                 if err != nil {
138                         log.Fatalln("Can not convert", CfgDeadline, "to integer:", err)
139                 }
140                 DefaultDeadline = time.Duration(i) * time.Second
141         }
142
143         var buf bytes.Buffer
144         spHead := SPHead{Type: SPTypeHalt}
145         if _, err := xdr.Marshal(&buf, spHead); err != nil {
146                 panic(err)
147         }
148         SPHaltMarshalized = make([]byte, SPHeadOverhead)
149         copy(SPHaltMarshalized, buf.Bytes())
150         buf.Reset()
151
152         spHead = SPHead{Type: SPTypePing}
153         if _, err := xdr.Marshal(&buf, spHead); err != nil {
154                 panic(err)
155         }
156         SPPingMarshalized = make([]byte, SPHeadOverhead)
157         copy(SPPingMarshalized, buf.Bytes())
158         buf.Reset()
159
160         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([MTHSize]byte)}
161         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
162                 panic(err)
163         }
164         SPInfoOverhead = buf.Len()
165         buf.Reset()
166
167         spFreq := SPFreq{Hash: new([MTHSize]byte), Offset: 123}
168         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
169                 panic(err)
170         }
171         SPFreqOverhead = buf.Len()
172         buf.Reset()
173
174         spFile := SPFile{Hash: new([MTHSize]byte), Offset: 123}
175         if _, err := xdr.Marshal(&buf, spFile); err != nil {
176                 panic(err)
177         }
178         SPFileOverhead = buf.Len()
179         spCheckerTasks = make(chan SPCheckerTask)
180 }
181
182 func MarshalSP(typ SPType, sp interface{}) []byte {
183         var buf bytes.Buffer
184         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
185                 panic(err)
186         }
187         if _, err := xdr.Marshal(&buf, sp); err != nil {
188                 panic(err)
189         }
190         return buf.Bytes()
191 }
192
193 func payloadsSplit(payloads [][]byte) [][]byte {
194         var outbounds [][]byte
195         outbound := make([]byte, 0, MaxSPSize)
196         for i, payload := range payloads {
197                 outbound = append(outbound, payload...)
198                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
199                         outbounds = append(outbounds, outbound)
200                         outbound = make([]byte, 0, MaxSPSize)
201                 }
202         }
203         if len(outbound) > 0 {
204                 outbounds = append(outbounds, outbound)
205         }
206         return outbounds
207 }
208
209 type SPState struct {
210         Ctx            *Ctx
211         Node           *Node
212         Nice           uint8
213         NoCK           bool
214         onlineDeadline time.Duration
215         maxOnlineTime  time.Duration
216         hs             *noise.HandshakeState
217         csOur          *noise.CipherState
218         csTheir        *noise.CipherState
219         payloads       chan []byte
220         pings          chan struct{}
221         infosTheir     map[[MTHSize]byte]*SPInfo
222         infosOurSeen   map[[MTHSize]byte]uint8
223         queueTheir     []*FreqWithNice
224         wg             sync.WaitGroup
225         RxBytes        int64
226         RxLastSeen     time.Time
227         RxLastNonPing  time.Time
228         TxBytes        int64
229         TxLastSeen     time.Time
230         TxLastNonPing  time.Time
231         started        time.Time
232         mustFinishAt   time.Time
233         Duration       time.Duration
234         RxSpeed        int64
235         TxSpeed        int64
236         rxLock         *os.File
237         txLock         *os.File
238         xxOnly         TRxTx
239         rxRate         int
240         txRate         int
241         isDead         chan struct{}
242         listOnly       bool
243         onlyPkts       map[[MTHSize]byte]bool
244         writeSPBuf     bytes.Buffer
245         fds            map[string]FdAndFullSize
246         fdsLock        sync.RWMutex
247         fileHashers    map[string]*MTHAndOffset
248         progressBars   map[string]struct{}
249         sync.RWMutex
250 }
251
252 func (state *SPState) SetDead() {
253         state.Lock()
254         defer state.Unlock()
255         select {
256         case <-state.isDead:
257                 // Already closed channel, dead
258                 return
259         default:
260         }
261         close(state.isDead)
262         go func() {
263                 for range state.payloads {
264                 }
265         }()
266         go func() {
267                 for range state.pings {
268                 }
269         }()
270 }
271
272 func (state *SPState) NotAlive() bool {
273         select {
274         case <-state.isDead:
275                 return true
276         default:
277         }
278         return false
279 }
280
281 func (state *SPState) dirUnlock() {
282         state.Ctx.UnlockDir(state.rxLock)
283         state.Ctx.UnlockDir(state.txLock)
284 }
285
286 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
287         state.writeSPBuf.Reset()
288         if _, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
289                 Magic:   MagicNNCPSv1.B,
290                 Payload: payload,
291         }); err != nil {
292                 return err
293         }
294         n, err := dst.Write(state.writeSPBuf.Bytes())
295         if err != nil {
296                 return err
297         }
298         state.TxLastSeen = time.Now()
299         state.TxBytes += int64(n)
300         if !ping {
301                 state.TxLastNonPing = state.TxLastSeen
302         }
303         return nil
304 }
305
306 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
307         var sp SPRaw
308         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
309         if err != nil {
310                 ue := err.(*xdr.UnmarshalError)
311                 if ue.Err == io.EOF {
312                         return nil, ue.Err
313                 }
314                 return nil, err
315         }
316         state.RxLastSeen = time.Now()
317         state.RxBytes += int64(n)
318         if sp.Magic != MagicNNCPSv1.B {
319                 return nil, BadMagic
320         }
321         return sp.Payload, nil
322 }
323
324 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[MTHSize]byte]uint8) [][]byte {
325         var infos []*SPInfo
326         var totalSize int64
327         for job := range ctx.Jobs(nodeId, TTx) {
328                 if job.PktEnc.Nice > nice {
329                         continue
330                 }
331                 if _, known := (*seen)[*job.HshValue]; known {
332                         continue
333                 }
334                 totalSize += job.Size
335                 infos = append(infos, &SPInfo{
336                         Nice: job.PktEnc.Nice,
337                         Size: uint64(job.Size),
338                         Hash: job.HshValue,
339                 })
340                 (*seen)[*job.HshValue] = job.PktEnc.Nice
341         }
342         sort.Sort(ByNice(infos))
343         var payloads [][]byte
344         for _, info := range infos {
345                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
346                 pktName := Base32Codec.EncodeToString(info.Hash[:])
347                 ctx.LogD("sp-info-our", LEs{
348                         {"Node", nodeId},
349                         {"Name", pktName},
350                         {"Size", info.Size},
351                 }, func(les LEs) string {
352                         return fmt.Sprintf(
353                                 "Our info: %s/tx/%s (%s)",
354                                 ctx.NodeName(nodeId),
355                                 pktName,
356                                 humanize.IBytes(info.Size),
357                         )
358                 })
359         }
360         if totalSize > 0 {
361                 ctx.LogI("sp-infos-tx", LEs{
362                         {"XX", string(TTx)},
363                         {"Node", nodeId},
364                         {"Pkts", len(payloads)},
365                         {"Size", totalSize},
366                 }, func(les LEs) string {
367                         return fmt.Sprintf(
368                                 "We have got for %s: %d packets, %s",
369                                 ctx.NodeName(nodeId),
370                                 len(payloads),
371                                 humanize.IBytes(uint64(totalSize)),
372                         )
373                 })
374         }
375         return payloadsSplit(payloads)
376 }
377
378 func (state *SPState) StartI(conn ConnDeadlined) error {
379         nodeId := state.Node.Id
380         err := state.Ctx.ensureRxDir(nodeId)
381         if err != nil {
382                 return err
383         }
384         var rxLock *os.File
385         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
386                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
387                 if err != nil {
388                         return err
389                 }
390         }
391         var txLock *os.File
392         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
393                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
394                 if err != nil {
395                         return err
396                 }
397         }
398         started := time.Now()
399         conf := noise.Config{
400                 CipherSuite: NoiseCipherSuite,
401                 Pattern:     noise.HandshakeIK,
402                 Initiator:   true,
403                 StaticKeypair: noise.DHKey{
404                         Private: state.Ctx.Self.NoisePrv[:],
405                         Public:  state.Ctx.Self.NoisePub[:],
406                 },
407                 PeerStatic: state.Node.NoisePub[:],
408         }
409         hs, err := noise.NewHandshakeState(conf)
410         if err != nil {
411                 return err
412         }
413         state.hs = hs
414         state.payloads = make(chan []byte)
415         state.pings = make(chan struct{})
416         state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
417         state.infosOurSeen = make(map[[MTHSize]byte]uint8)
418         state.progressBars = make(map[string]struct{})
419         state.started = started
420         state.rxLock = rxLock
421         state.txLock = txLock
422
423         var infosPayloads [][]byte
424         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
425                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
426         }
427         var firstPayload []byte
428         if len(infosPayloads) > 0 {
429                 firstPayload = infosPayloads[0]
430         }
431         // Pad first payload, to hide actual number of existing files
432         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
433                 firstPayload = append(firstPayload, SPHaltMarshalized...)
434         }
435
436         var buf []byte
437         var payload []byte
438         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
439         if err != nil {
440                 state.dirUnlock()
441                 return err
442         }
443         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
444         state.Ctx.LogD("sp-startI", les, func(les LEs) string {
445                 return fmt.Sprintf(
446                         "SP with %s (nice %s): sending first message",
447                         state.Node.Name,
448                         NicenessFmt(state.Nice),
449                 )
450         })
451         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
452         if err = state.WriteSP(conn, buf, false); err != nil {
453                 state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
454                         return fmt.Sprintf(
455                                 "SP with %s (nice %s): writing",
456                                 state.Node.Name,
457                                 NicenessFmt(state.Nice),
458                         )
459                 })
460                 state.dirUnlock()
461                 return err
462         }
463         state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string {
464                 return fmt.Sprintf(
465                         "SP with %s (nice %s): waiting for first message",
466                         state.Node.Name,
467                         NicenessFmt(state.Nice),
468                 )
469         })
470         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
471         if buf, err = state.ReadSP(conn); err != nil {
472                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
473                         return fmt.Sprintf(
474                                 "SP with %s (nice %s): reading",
475                                 state.Node.Name,
476                                 NicenessFmt(state.Nice),
477                         )
478                 })
479                 state.dirUnlock()
480                 return err
481         }
482         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
483         if err != nil {
484                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
485                         return fmt.Sprintf(
486                                 "SP with %s (nice %s): reading Noise message",
487                                 state.Node.Name,
488                                 NicenessFmt(state.Nice),
489                         )
490                 })
491                 state.dirUnlock()
492                 return err
493         }
494         state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string {
495                 return fmt.Sprintf(
496                         "SP with %s (nice %s): starting workers",
497                         state.Node.Name,
498                         NicenessFmt(state.Nice),
499                 )
500         })
501         err = state.StartWorkers(conn, infosPayloads, payload)
502         if err != nil {
503                 state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string {
504                         return fmt.Sprintf(
505                                 "SP with %s (nice %s): starting workers",
506                                 state.Node.Name,
507                                 NicenessFmt(state.Nice),
508                         )
509                 })
510                 state.dirUnlock()
511         }
512         return err
513 }
514
515 func (state *SPState) StartR(conn ConnDeadlined) error {
516         started := time.Now()
517         conf := noise.Config{
518                 CipherSuite: NoiseCipherSuite,
519                 Pattern:     noise.HandshakeIK,
520                 Initiator:   false,
521                 StaticKeypair: noise.DHKey{
522                         Private: state.Ctx.Self.NoisePrv[:],
523                         Public:  state.Ctx.Self.NoisePub[:],
524                 },
525         }
526         hs, err := noise.NewHandshakeState(conf)
527         if err != nil {
528                 return err
529         }
530         xxOnly := TRxTx("")
531         state.hs = hs
532         state.payloads = make(chan []byte)
533         state.pings = make(chan struct{})
534         state.infosOurSeen = make(map[[MTHSize]byte]uint8)
535         state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
536         state.progressBars = make(map[string]struct{})
537         state.started = started
538         state.xxOnly = xxOnly
539
540         var buf []byte
541         var payload []byte
542         logMsg := func(les LEs) string {
543                 return fmt.Sprintf(
544                         "SP nice %s: waiting for first message",
545                         NicenessFmt(state.Nice),
546                 )
547         }
548         les := LEs{{"Nice", int(state.Nice)}}
549         state.Ctx.LogD("sp-startR", les, logMsg)
550         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
551         if buf, err = state.ReadSP(conn); err != nil {
552                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
553                 return err
554         }
555         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
556                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
557                 return err
558         }
559
560         var node *Node
561         for _, n := range state.Ctx.Neigh {
562                 if n.NoisePub == nil {
563                         continue
564                 }
565                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
566                         node = n
567                         break
568                 }
569         }
570         if node == nil {
571                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
572                 err = errors.New("unknown peer: " + peerId)
573                 state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg)
574                 return err
575         }
576         state.Node = node
577         state.rxRate = node.RxRate
578         state.txRate = node.TxRate
579         state.onlineDeadline = node.OnlineDeadline
580         state.maxOnlineTime = node.MaxOnlineTime
581         les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
582
583         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
584                 return err
585         }
586         var rxLock *os.File
587         if xxOnly == "" || xxOnly == TRx {
588                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
589                 if err != nil {
590                         return err
591                 }
592         }
593         state.rxLock = rxLock
594         var txLock *os.File
595         if xxOnly == "" || xxOnly == TTx {
596                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
597                 if err != nil {
598                         return err
599                 }
600         }
601         state.txLock = txLock
602
603         var infosPayloads [][]byte
604         if xxOnly == "" || xxOnly == TTx {
605                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
606         }
607         var firstPayload []byte
608         if len(infosPayloads) > 0 {
609                 firstPayload = infosPayloads[0]
610         }
611         // Pad first payload, to hide actual number of existing files
612         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
613                 firstPayload = append(firstPayload, SPHaltMarshalized...)
614         }
615
616         state.Ctx.LogD("sp-startR-write", les, func(les LEs) string {
617                 return fmt.Sprintf(
618                         "SP with %s (nice %s): sending first message",
619                         node.Name, NicenessFmt(state.Nice),
620                 )
621         })
622         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
623         if err != nil {
624                 state.dirUnlock()
625                 return err
626         }
627         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
628         if err = state.WriteSP(conn, buf, false); err != nil {
629                 state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
630                         return fmt.Sprintf(
631                                 "SP with %s (nice %s): writing",
632                                 node.Name, NicenessFmt(state.Nice),
633                         )
634                 })
635                 state.dirUnlock()
636                 return err
637         }
638         state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string {
639                 return fmt.Sprintf(
640                         "SP with %s (nice %s): starting workers",
641                         node.Name, NicenessFmt(state.Nice),
642                 )
643         })
644         err = state.StartWorkers(conn, infosPayloads, payload)
645         if err != nil {
646                 state.dirUnlock()
647         }
648         return err
649 }
650
651 func (state *SPState) closeFd(pth string) {
652         state.fdsLock.Lock()
653         if s, exists := state.fds[pth]; exists {
654                 delete(state.fds, pth)
655                 s.fd.Close()
656         }
657         state.fdsLock.Unlock()
658 }
659
660 func (state *SPState) StartWorkers(
661         conn ConnDeadlined,
662         infosPayloads [][]byte,
663         payload []byte,
664 ) error {
665         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
666         state.fds = make(map[string]FdAndFullSize)
667         state.fileHashers = make(map[string]*MTHAndOffset)
668         state.isDead = make(chan struct{})
669         if state.maxOnlineTime > 0 {
670                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
671         }
672         if !state.NoCK {
673                 spCheckerOnce.Do(func() { go SPChecker(state.Ctx) })
674                 go func() {
675                         for job := range state.Ctx.JobsNoCK(state.Node.Id) {
676                                 if job.PktEnc.Nice <= state.Nice {
677                                         spCheckerTasks <- SPCheckerTask{
678                                                 nodeId: state.Node.Id,
679                                                 hsh:    job.HshValue,
680                                                 done:   state.payloads,
681                                         }
682                                 }
683                         }
684                 }()
685         }
686
687         // Remaining handshake payload sending
688         if len(infosPayloads) > 1 {
689                 state.wg.Add(1)
690                 go func() {
691                         for _, payload := range infosPayloads[1:] {
692                                 state.Ctx.LogD(
693                                         "sp-queue-remaining",
694                                         append(les, LE{"Size", int64(len(payload))}),
695                                         func(les LEs) string {
696                                                 return fmt.Sprintf(
697                                                         "SP with %s (nice %s): queuing remaining payload (%s)",
698                                                         state.Node.Name, NicenessFmt(state.Nice),
699                                                         humanize.IBytes(uint64(len(payload))),
700                                                 )
701                                         },
702                                 )
703                                 state.payloads <- payload
704                         }
705                         state.wg.Done()
706                 }()
707         }
708
709         // Processing of first payload and queueing its responses
710         logMsg := func(les LEs) string {
711                 return fmt.Sprintf(
712                         "SP with %s (nice %s): processing first payload (%s)",
713                         state.Node.Name, NicenessFmt(state.Nice),
714                         humanize.IBytes(uint64(len(payload))),
715                 )
716         }
717         state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
718         replies, err := state.ProcessSP(payload)
719         if err != nil {
720                 state.Ctx.LogE("sp-process", les, err, logMsg)
721                 return err
722         }
723         state.wg.Add(1)
724         go func() {
725                 for _, reply := range replies {
726                         state.Ctx.LogD(
727                                 "sp-queue-reply",
728                                 append(les, LE{"Size", int64(len(reply))}),
729                                 func(les LEs) string {
730                                         return fmt.Sprintf(
731                                                 "SP with %s (nice %s): queuing reply (%s)",
732                                                 state.Node.Name, NicenessFmt(state.Nice),
733                                                 humanize.IBytes(uint64(len(payload))),
734                                         )
735                                 },
736                         )
737                         state.payloads <- reply
738                 }
739                 state.wg.Done()
740         }()
741
742         // Periodic jobs
743         state.wg.Add(1)
744         go func() {
745                 deadlineTicker := time.NewTicker(time.Second)
746                 pingTicker := time.NewTicker(PingTimeout)
747                 for {
748                         select {
749                         case <-state.isDead:
750                                 state.wg.Done()
751                                 deadlineTicker.Stop()
752                                 pingTicker.Stop()
753                                 return
754                         case now := <-deadlineTicker.C:
755                                 if now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
756                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline {
757                                         goto Deadlined
758                                 }
759                                 if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) {
760                                         goto Deadlined
761                                 }
762                                 if now.Sub(state.RxLastSeen) >= 2*PingTimeout {
763                                         goto Deadlined
764                                 }
765                                 break
766                         Deadlined:
767                                 state.SetDead()
768                                 conn.Close()
769                         case now := <-pingTicker.C:
770                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
771                                         state.wg.Add(1)
772                                         go func() {
773                                                 state.pings <- struct{}{}
774                                                 state.wg.Done()
775                                         }()
776                                 }
777                         }
778                 }
779         }()
780
781         // Spool checker and INFOs sender of appearing files
782         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
783                 state.wg.Add(1)
784                 go func() {
785                         dw, err := state.Ctx.NewDirWatcher(
786                                 filepath.Join(state.Ctx.Spool, state.Node.Id.String(), string(TTx)),
787                                 time.Second,
788                         )
789                         if err != nil {
790                                 state.Ctx.LogE("sp-queue-dir-watch", les, err, logMsg)
791                                 log.Fatalln(err)
792                         }
793                         for {
794                                 select {
795                                 case <-state.isDead:
796                                         dw.Close()
797                                         state.wg.Done()
798                                         return
799                                 case <-dw.C:
800                                         for _, payload := range state.Ctx.infosOur(
801                                                 state.Node.Id,
802                                                 state.Nice,
803                                                 &state.infosOurSeen,
804                                         ) {
805                                                 state.Ctx.LogD(
806                                                         "sp-queue-info",
807                                                         append(les, LE{"Size", int64(len(payload))}),
808                                                         func(les LEs) string {
809                                                                 return fmt.Sprintf(
810                                                                         "SP with %s (nice %s): queuing new info (%s)",
811                                                                         state.Node.Name, NicenessFmt(state.Nice),
812                                                                         humanize.IBytes(uint64(len(payload))),
813                                                                 )
814                                                         },
815                                                 )
816                                                 state.payloads <- payload
817                                         }
818                                 }
819                         }
820                 }()
821         }
822
823         // Sender
824         state.wg.Add(1)
825         go func() {
826                 defer conn.Close()
827                 defer state.SetDead()
828                 defer state.wg.Done()
829                 buf := make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
830                 for {
831                         if state.NotAlive() {
832                                 return
833                         }
834                         var payload []byte
835                         var ping bool
836                         select {
837                         case <-state.pings:
838                                 state.Ctx.LogD("sp-got-ping", les, func(les LEs) string {
839                                         return fmt.Sprintf(
840                                                 "SP with %s (nice %s): got ping",
841                                                 state.Node.Name, NicenessFmt(state.Nice),
842                                         )
843                                 })
844                                 payload = SPPingMarshalized
845                                 ping = true
846                         case payload = <-state.payloads:
847                                 state.Ctx.LogD(
848                                         "sp-got-payload",
849                                         append(les, LE{"Size", int64(len(payload))}),
850                                         func(les LEs) string {
851                                                 return fmt.Sprintf(
852                                                         "SP with %s (nice %s): got payload (%s)",
853                                                         state.Node.Name, NicenessFmt(state.Nice),
854                                                         humanize.IBytes(uint64(len(payload))),
855                                                 )
856                                         },
857                                 )
858                         default:
859                                 state.RLock()
860                                 if len(state.queueTheir) == 0 {
861                                         state.RUnlock()
862                                         time.Sleep(100 * time.Millisecond)
863                                         continue
864                                 }
865                                 freq := state.queueTheir[0].freq
866                                 state.RUnlock()
867                                 if state.txRate > 0 {
868                                         time.Sleep(time.Second / time.Duration(state.txRate))
869                                 }
870                                 pktName := Base32Codec.EncodeToString(freq.Hash[:])
871                                 lesp := append(
872                                         les,
873                                         LE{"XX", string(TTx)},
874                                         LE{"Pkt", pktName},
875                                         LE{"Size", int64(freq.Offset)},
876                                 )
877                                 logMsg := func(les LEs) string {
878                                         return fmt.Sprintf(
879                                                 "SP with %s (nice %s): tx/%s (%s)",
880                                                 state.Node.Name, NicenessFmt(state.Nice),
881                                                 pktName,
882                                                 humanize.IBytes(freq.Offset),
883                                         )
884                                 }
885                                 state.Ctx.LogD("sp-queue", lesp, func(les LEs) string {
886                                         return logMsg(les) + ": queueing"
887                                 })
888                                 pth := filepath.Join(
889                                         state.Ctx.Spool,
890                                         state.Node.Id.String(),
891                                         string(TTx),
892                                         Base32Codec.EncodeToString(freq.Hash[:]),
893                                 )
894                                 state.fdsLock.RLock()
895                                 fdAndFullSize, exists := state.fds[pth]
896                                 state.fdsLock.RUnlock()
897                                 if !exists {
898                                         state.Ctx.LogD("sp-queue-open", lesp, func(les LEs) string {
899                                                 return logMsg(les) + ": opening"
900                                         })
901                                         fd, err := os.Open(pth)
902                                         if err != nil {
903                                                 state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
904                                                         return logMsg(les) + ": opening"
905                                                 })
906                                                 return
907                                         }
908                                         fi, err := fd.Stat()
909                                         if err != nil {
910                                                 state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string {
911                                                         return logMsg(les) + ": stating"
912                                                 })
913                                                 return
914                                         }
915                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
916                                         state.fdsLock.Lock()
917                                         state.fds[pth] = fdAndFullSize
918                                         state.fdsLock.Unlock()
919                                 }
920                                 fd := fdAndFullSize.fd
921                                 fullSize := fdAndFullSize.fullSize
922                                 lesp = append(lesp, LE{"FullSize", fullSize})
923                                 var bufRead []byte
924                                 if freq.Offset < uint64(fullSize) {
925                                         state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
926                                                 return logMsg(les) + ": seeking"
927                                         })
928                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
929                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
930                                                         return logMsg(les) + ": seeking"
931                                                 })
932                                                 return
933                                         }
934                                         n, err := fd.Read(buf)
935                                         if err != nil {
936                                                 state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
937                                                         return logMsg(les) + ": reading"
938                                                 })
939                                                 return
940                                         }
941                                         bufRead = buf[:n]
942                                         lesp = append(
943                                                 les,
944                                                 LE{"XX", string(TTx)},
945                                                 LE{"Pkt", pktName},
946                                                 LE{"Size", int64(n)},
947                                                 LE{"FullSize", fullSize},
948                                         )
949                                         state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
950                                                 return fmt.Sprintf(
951                                                         "%s: read %s",
952                                                         logMsg(les), humanize.IBytes(uint64(n)),
953                                                 )
954                                         })
955                                 } else {
956                                         state.closeFd(pth)
957                                 }
958                                 payload = MarshalSP(SPTypeFile, SPFile{
959                                         Hash:    freq.Hash,
960                                         Offset:  freq.Offset,
961                                         Payload: bufRead,
962                                 })
963                                 ourSize := freq.Offset + uint64(len(bufRead))
964                                 lesp = append(
965                                         les,
966                                         LE{"XX", string(TTx)},
967                                         LE{"Pkt", pktName},
968                                         LE{"Size", int64(ourSize)},
969                                         LE{"FullSize", fullSize},
970                                 )
971                                 if state.Ctx.ShowPrgrs {
972                                         state.progressBars[pktName] = struct{}{}
973                                         Progress("Tx", lesp)
974                                 }
975                                 if ourSize == uint64(fullSize) {
976                                         state.closeFd(pth)
977                                         state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
978                                                 return logMsg(les) + ": finished"
979                                         })
980                                         if state.Ctx.ShowPrgrs {
981                                                 delete(state.progressBars, pktName)
982                                         }
983                                 }
984                                 state.Lock()
985                                 for i, q := range state.queueTheir {
986                                         if *q.freq.Hash != *freq.Hash {
987                                                 continue
988                                         }
989                                         if ourSize == uint64(fullSize) {
990                                                 state.queueTheir = append(
991                                                         state.queueTheir[:i],
992                                                         state.queueTheir[i+1:]...,
993                                                 )
994                                         } else {
995                                                 q.freq.Offset = ourSize
996                                         }
997                                         break
998                                 }
999                                 state.Unlock()
1000                         }
1001                         logMsg := func(les LEs) string {
1002                                 return fmt.Sprintf(
1003                                         "SP with %s (nice %s): sending %s",
1004                                         state.Node.Name, NicenessFmt(state.Nice),
1005                                         humanize.IBytes(uint64(len(payload))),
1006                                 )
1007                         }
1008                         state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
1009                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
1010                         ct, err := state.csOur.Encrypt(nil, nil, payload)
1011                         if err != nil {
1012                                 state.Ctx.LogE("sp-encrypting", les, err, logMsg)
1013                                 return
1014                         }
1015                         if err := state.WriteSP(conn, ct, ping); err != nil {
1016                                 state.Ctx.LogE("sp-sending", les, err, logMsg)
1017                                 return
1018                         }
1019                 }
1020         }()
1021
1022         // Receiver
1023         state.wg.Add(1)
1024         go func() {
1025                 for {
1026                         if state.NotAlive() {
1027                                 break
1028                         }
1029                         logMsg := func(les LEs) string {
1030                                 return fmt.Sprintf(
1031                                         "SP with %s (nice %s): waiting for payload",
1032                                         state.Node.Name, NicenessFmt(state.Nice),
1033                                 )
1034                         }
1035                         state.Ctx.LogD("sp-recv-wait", les, logMsg)
1036                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
1037                         payload, err := state.ReadSP(conn)
1038                         if err != nil {
1039                                 if err == io.EOF {
1040                                         break
1041                                 }
1042                                 unmarshalErr := err.(*xdr.UnmarshalError)
1043                                 if os.IsTimeout(unmarshalErr.Err) {
1044                                         continue
1045                                 }
1046                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
1047                                         break
1048                                 }
1049                                 state.Ctx.LogE("sp-recv-wait", les, err, logMsg)
1050                                 break
1051                         }
1052                         logMsg = func(les LEs) string {
1053                                 return fmt.Sprintf(
1054                                         "SP with %s (nice %s): payload (%s)",
1055                                         state.Node.Name, NicenessFmt(state.Nice),
1056                                         humanize.IBytes(uint64(len(payload))),
1057                                 )
1058                         }
1059                         state.Ctx.LogD(
1060                                 "sp-recv-got",
1061                                 append(les, LE{"Size", int64(len(payload))}),
1062                                 func(les LEs) string { return logMsg(les) + ": got" },
1063                         )
1064                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
1065                         if err != nil {
1066                                 state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string {
1067                                         return logMsg(les) + ": got"
1068                                 })
1069                                 break
1070                         }
1071                         state.Ctx.LogD(
1072                                 "sp-recv-process",
1073                                 append(les, LE{"Size", int64(len(payload))}),
1074                                 func(les LEs) string {
1075                                         return logMsg(les) + ": processing"
1076                                 },
1077                         )
1078                         replies, err := state.ProcessSP(payload)
1079                         if err != nil {
1080                                 state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string {
1081                                         return logMsg(les) + ": processing"
1082                                 })
1083                                 break
1084                         }
1085                         state.wg.Add(1)
1086                         go func() {
1087                                 for _, reply := range replies {
1088                                         state.Ctx.LogD(
1089                                                 "sp-recv-reply",
1090                                                 append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
1091                                                 func(les LEs) string {
1092                                                         return fmt.Sprintf(
1093                                                                 "SP with %s (nice %s): queuing reply (%s)",
1094                                                                 state.Node.Name, NicenessFmt(state.Nice),
1095                                                                 humanize.IBytes(uint64(len(reply))),
1096                                                         )
1097                                                 },
1098                                         )
1099                                         state.payloads <- reply
1100                                 }
1101                                 state.wg.Done()
1102                         }()
1103                         if state.rxRate > 0 {
1104                                 time.Sleep(time.Second / time.Duration(state.rxRate))
1105                         }
1106                 }
1107                 state.SetDead()
1108                 state.wg.Done()
1109                 state.SetDead()
1110                 conn.Close()
1111         }()
1112
1113         return nil
1114 }
1115
1116 func (state *SPState) Wait() bool {
1117         state.wg.Wait()
1118         close(state.payloads)
1119         close(state.pings)
1120         state.Duration = time.Since(state.started)
1121         state.dirUnlock()
1122         state.RxSpeed = state.RxBytes
1123         state.TxSpeed = state.TxBytes
1124         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
1125         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
1126         if rxDuration > 0 {
1127                 state.RxSpeed = state.RxBytes / rxDuration
1128         }
1129         if txDuration > 0 {
1130                 state.TxSpeed = state.TxBytes / txDuration
1131         }
1132         nothingLeft := len(state.queueTheir) == 0
1133         for _, s := range state.fds {
1134                 nothingLeft = false
1135                 s.fd.Close()
1136         }
1137         for pktName := range state.progressBars {
1138                 ProgressKill(pktName)
1139         }
1140         return nothingLeft
1141 }
1142
1143 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
1144         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
1145         r := bytes.NewReader(payload)
1146         var err error
1147         var replies [][]byte
1148         var infosGot bool
1149         for r.Len() > 0 {
1150                 state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string {
1151                         return fmt.Sprintf(
1152                                 "SP with %s (nice %s): unmarshaling header",
1153                                 state.Node.Name, NicenessFmt(state.Nice),
1154                         )
1155                 })
1156                 var head SPHead
1157                 if _, err = xdr.Unmarshal(r, &head); err != nil {
1158                         state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string {
1159                                 return fmt.Sprintf(
1160                                         "SP with %s (nice %s): unmarshaling header",
1161                                         state.Node.Name, NicenessFmt(state.Nice),
1162                                 )
1163                         })
1164                         return nil, err
1165                 }
1166                 if head.Type != SPTypePing {
1167                         state.RxLastNonPing = state.RxLastSeen
1168                 }
1169                 switch head.Type {
1170                 case SPTypeHalt:
1171                         state.Ctx.LogD(
1172                                 "sp-process-halt",
1173                                 append(les, LE{"Type", "halt"}), func(les LEs) string {
1174                                         return fmt.Sprintf(
1175                                                 "SP with %s (nice %s): got HALT",
1176                                                 state.Node.Name, NicenessFmt(state.Nice),
1177                                         )
1178                                 },
1179                         )
1180                         state.Lock()
1181                         state.queueTheir = nil
1182                         state.Unlock()
1183
1184                 case SPTypePing:
1185                         state.Ctx.LogD(
1186                                 "sp-process-ping",
1187                                 append(les, LE{"Type", "ping"}),
1188                                 func(les LEs) string {
1189                                         return fmt.Sprintf(
1190                                                 "SP with %s (nice %s): got PING",
1191                                                 state.Node.Name, NicenessFmt(state.Nice),
1192                                         )
1193                                 },
1194                         )
1195
1196                 case SPTypeInfo:
1197                         infosGot = true
1198                         lesp := append(les, LE{"Type", "info"})
1199                         state.Ctx.LogD(
1200                                 "sp-process-info-unmarshal", lesp,
1201                                 func(les LEs) string {
1202                                         return fmt.Sprintf(
1203                                                 "SP with %s (nice %s): unmarshaling INFO",
1204                                                 state.Node.Name, NicenessFmt(state.Nice),
1205                                         )
1206                                 },
1207                         )
1208                         var info SPInfo
1209                         if _, err = xdr.Unmarshal(r, &info); err != nil {
1210                                 state.Ctx.LogE(
1211                                         "sp-process-info-unmarshal", lesp, err,
1212                                         func(les LEs) string {
1213                                                 return fmt.Sprintf(
1214                                                         "SP with %s (nice %s): unmarshaling INFO",
1215                                                         state.Node.Name, NicenessFmt(state.Nice),
1216                                                 )
1217                                         },
1218                                 )
1219                                 return nil, err
1220                         }
1221                         pktName := Base32Codec.EncodeToString(info.Hash[:])
1222                         lesp = append(
1223                                 lesp,
1224                                 LE{"Pkt", pktName},
1225                                 LE{"Size", int64(info.Size)},
1226                                 LE{"PktNice", int(info.Nice)},
1227                         )
1228                         logMsg := func(les LEs) string {
1229                                 return fmt.Sprintf(
1230                                         "SP with %s (nice %s): INFO %s (%s) nice %s",
1231                                         state.Node.Name, NicenessFmt(state.Nice),
1232                                         pktName,
1233                                         humanize.IBytes(info.Size),
1234                                         NicenessFmt(info.Nice),
1235                                 )
1236                         }
1237                         if !state.listOnly && info.Nice > state.Nice {
1238                                 state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string {
1239                                         return logMsg(les) + ": too nice"
1240                                 })
1241                                 continue
1242                         }
1243                         state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string {
1244                                 return logMsg(les) + ": received"
1245                         })
1246                         if !state.listOnly && state.xxOnly == TTx {
1247                                 continue
1248                         }
1249                         state.Lock()
1250                         state.infosTheir[*info.Hash] = &info
1251                         state.Unlock()
1252                         state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string {
1253                                 return logMsg(les) + ": stating part"
1254                         })
1255                         pktPath := filepath.Join(
1256                                 state.Ctx.Spool,
1257                                 state.Node.Id.String(),
1258                                 string(TRx),
1259                                 Base32Codec.EncodeToString(info.Hash[:]),
1260                         )
1261                         logMsg = func(les LEs) string {
1262                                 return fmt.Sprintf(
1263                                         "Packet %s (%s) (nice %s)",
1264                                         pktName,
1265                                         humanize.IBytes(info.Size),
1266                                         NicenessFmt(info.Nice),
1267                                 )
1268                         }
1269                         if _, err = os.Stat(pktPath); err == nil {
1270                                 state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string {
1271                                         return logMsg(les) + ": already done"
1272                                 })
1273                                 if !state.listOnly {
1274                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1275                                 }
1276                                 continue
1277                         }
1278                         if _, err = os.Stat(filepath.Join(
1279                                 state.Ctx.Spool, state.Node.Id.String(), string(TRx),
1280                                 SeenDir, Base32Codec.EncodeToString(info.Hash[:]),
1281                         )); err == nil {
1282                                 state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
1283                                         return logMsg(les) + ": already seen"
1284                                 })
1285                                 if !state.listOnly {
1286                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1287                                 }
1288                                 continue
1289                         }
1290                         if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
1291                                 state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string {
1292                                         return logMsg(les) + ": still not checksummed"
1293                                 })
1294                                 continue
1295                         }
1296                         fi, err := os.Stat(pktPath + PartSuffix)
1297                         var offset int64
1298                         if err == nil {
1299                                 offset = fi.Size()
1300                         }
1301                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
1302                                 state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string {
1303                                         return logMsg(les) + ": not enough space"
1304                                 })
1305                                 continue
1306                         }
1307                         state.Ctx.LogI(
1308                                 "sp-info",
1309                                 append(lesp, LE{"Offset", offset}),
1310                                 func(les LEs) string {
1311                                         return fmt.Sprintf(
1312                                                 "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size,
1313                                         )
1314                                 },
1315                         )
1316                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1317                                 replies = append(replies, MarshalSP(
1318                                         SPTypeFreq,
1319                                         SPFreq{info.Hash, uint64(offset)},
1320                                 ))
1321                         }
1322
1323                 case SPTypeFile:
1324                         lesp := append(les, LE{"Type", "file"})
1325                         state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string {
1326                                 return fmt.Sprintf(
1327                                         "SP with %s (nice %s): unmarshaling FILE",
1328                                         state.Node.Name, NicenessFmt(state.Nice),
1329                                 )
1330                         })
1331                         var file SPFile
1332                         if _, err = xdr.Unmarshal(r, &file); err != nil {
1333                                 state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string {
1334                                         return fmt.Sprintf(
1335                                                 "SP with %s (nice %s): unmarshaling FILE",
1336                                                 state.Node.Name, NicenessFmt(state.Nice),
1337                                         )
1338                                 })
1339                                 return nil, err
1340                         }
1341                         pktName := Base32Codec.EncodeToString(file.Hash[:])
1342                         lesp = append(
1343                                 lesp,
1344                                 LE{"XX", string(TRx)},
1345                                 LE{"Pkt", pktName},
1346                                 LE{"Size", int64(len(file.Payload))},
1347                         )
1348                         logMsg := func(les LEs) string {
1349                                 return fmt.Sprintf(
1350                                         "Got packet %s (%s)",
1351                                         pktName, humanize.IBytes(uint64(len(file.Payload))),
1352                                 )
1353                         }
1354                         fullsize := int64(0)
1355                         state.RLock()
1356                         infoTheir := state.infosTheir[*file.Hash]
1357                         state.RUnlock()
1358                         if infoTheir == nil {
1359                                 state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1360                                         return logMsg(les) + ": unknown file"
1361                                 })
1362                                 continue
1363                         }
1364                         fullsize = int64(infoTheir.Size)
1365                         lesp = append(lesp, LE{"FullSize", fullsize})
1366                         dirToSync := filepath.Join(
1367                                 state.Ctx.Spool,
1368                                 state.Node.Id.String(),
1369                                 string(TRx),
1370                         )
1371                         filePath := filepath.Join(dirToSync, pktName)
1372                         filePathPart := filePath + PartSuffix
1373                         state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
1374                                 return logMsg(les) + ": opening part"
1375                         })
1376                         state.fdsLock.RLock()
1377                         fdAndFullSize, exists := state.fds[filePathPart]
1378                         state.fdsLock.RUnlock()
1379                         hasherAndOffset := state.fileHashers[filePath]
1380                         var fd *os.File
1381                         if exists {
1382                                 fd = fdAndFullSize.fd
1383                         } else {
1384                                 fd, err = os.OpenFile(
1385                                         filePathPart,
1386                                         os.O_RDWR|os.O_CREATE,
1387                                         os.FileMode(0666),
1388                                 )
1389                                 if err != nil {
1390                                         state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1391                                                 return logMsg(les) + ": opening part"
1392                                         })
1393                                         return nil, err
1394                                 }
1395                                 state.fdsLock.Lock()
1396                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1397                                 state.fdsLock.Unlock()
1398                                 if !state.NoCK {
1399                                         hasherAndOffset = &MTHAndOffset{
1400                                                 mth:    MTHNew(fullsize, int64(file.Offset)),
1401                                                 offset: file.Offset,
1402                                         }
1403                                         state.fileHashers[filePath] = hasherAndOffset
1404                                 }
1405                         }
1406                         state.Ctx.LogD(
1407                                 "sp-file-seek",
1408                                 append(lesp, LE{"Offset", file.Offset}),
1409                                 func(les LEs) string {
1410                                         return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset)
1411                                 })
1412                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1413                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1414                                         return logMsg(les) + ": seeking"
1415                                 })
1416                                 state.closeFd(filePathPart)
1417                                 return nil, err
1418                         }
1419                         state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string {
1420                                 return logMsg(les) + ": writing"
1421                         })
1422                         if _, err = fd.Write(file.Payload); err != nil {
1423                                 state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string {
1424                                         return logMsg(les) + ": writing"
1425                                 })
1426                                 state.closeFd(filePathPart)
1427                                 return nil, err
1428                         }
1429                         if hasherAndOffset != nil {
1430                                 if hasherAndOffset.offset == file.Offset {
1431                                         if _, err = hasherAndOffset.mth.Write(file.Payload); err != nil {
1432                                                 panic(err)
1433                                         }
1434                                         hasherAndOffset.offset += uint64(len(file.Payload))
1435                                 } else {
1436                                         state.Ctx.LogE(
1437                                                 "sp-file-offset-differs", lesp, errors.New("offset differs"),
1438                                                 func(les LEs) string {
1439                                                         return logMsg(les) + ": deleting hasher"
1440                                                 },
1441                                         )
1442                                         delete(state.fileHashers, filePath)
1443                                         hasherAndOffset = nil
1444                                 }
1445                         }
1446                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1447                         lesp[len(lesp)-2].V = ourSize
1448                         if state.Ctx.ShowPrgrs {
1449                                 state.progressBars[pktName] = struct{}{}
1450                                 Progress("Rx", lesp)
1451                         }
1452                         if fullsize != ourSize {
1453                                 continue
1454                         }
1455                         if state.Ctx.ShowPrgrs {
1456                                 delete(state.progressBars, pktName)
1457                         }
1458                         logMsg = func(les LEs) string {
1459                                 return fmt.Sprintf(
1460                                         "Got packet %s %d%% (%s / %s)",
1461                                         pktName, 100*ourSize/fullsize,
1462                                         humanize.IBytes(uint64(ourSize)),
1463                                         humanize.IBytes(uint64(fullsize)),
1464                                 )
1465                         }
1466                         if !NoSync {
1467                                 err = fd.Sync()
1468                                 if err != nil {
1469                                         state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
1470                                                 return logMsg(les) + ": syncing"
1471                                         })
1472                                         state.closeFd(filePathPart)
1473                                         continue
1474                                 }
1475                         }
1476                         if hasherAndOffset != nil {
1477                                 delete(state.fileHashers, filePath)
1478                                 if hasherAndOffset.mth.PreaddSize() == 0 {
1479                                         if !bytes.Equal(hasherAndOffset.mth.Sum(nil), file.Hash[:]) {
1480                                                 state.Ctx.LogE(
1481                                                         "sp-file-bad-checksum", lesp,
1482                                                         errors.New("checksum mismatch"),
1483                                                         logMsg,
1484                                                 )
1485                                                 state.closeFd(filePathPart)
1486                                                 continue
1487                                         }
1488                                         if err = os.Rename(filePathPart, filePath); err != nil {
1489                                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1490                                                         return logMsg(les) + ": renaming"
1491                                                 })
1492                                                 state.closeFd(filePathPart)
1493                                                 continue
1494                                         }
1495                                         if err = DirSync(dirToSync); err != nil {
1496                                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1497                                                         return logMsg(les) + ": dirsyncing"
1498                                                 })
1499                                                 state.closeFd(filePathPart)
1500                                                 continue
1501                                         }
1502                                         state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
1503                                                 return logMsg(les) + ": done"
1504                                         })
1505                                         state.wg.Add(1)
1506                                         go func() {
1507                                                 state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1508                                                 state.wg.Done()
1509                                         }()
1510                                         state.Lock()
1511                                         delete(state.infosTheir, *file.Hash)
1512                                         state.Unlock()
1513                                         if !state.Ctx.HdrUsage {
1514                                                 continue
1515                                         }
1516                                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
1517                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1518                                                         return logMsg(les) + ": seeking"
1519                                                 })
1520                                                 state.closeFd(filePathPart)
1521                                                 continue
1522                                         }
1523                                         _, pktEncRaw, err := state.Ctx.HdrRead(fd)
1524                                         state.closeFd(filePathPart)
1525                                         if err != nil {
1526                                                 state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
1527                                                         return logMsg(les) + ": HdrReading"
1528                                                 })
1529                                                 continue
1530                                         }
1531                                         state.Ctx.HdrWrite(pktEncRaw, filePath)
1532                                         continue
1533                                 }
1534                         }
1535                         state.closeFd(filePathPart)
1536                         if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1537                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1538                                         return logMsg(les) + ": renaming"
1539                                 })
1540                                 continue
1541                         }
1542                         if err = DirSync(dirToSync); err != nil {
1543                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1544                                         return logMsg(les) + ": dirsyncing"
1545                                 })
1546                                 continue
1547                         }
1548                         state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string {
1549                                 return logMsg(les) + ": downloaded"
1550                         })
1551                         state.Lock()
1552                         delete(state.infosTheir, *file.Hash)
1553                         state.Unlock()
1554                         go func() {
1555                                 t := SPCheckerTask{
1556                                         nodeId: state.Node.Id,
1557                                         hsh:    file.Hash,
1558                                         done:   state.payloads,
1559                                 }
1560                                 if hasherAndOffset != nil {
1561                                         t.mth = hasherAndOffset.mth
1562                                 }
1563                                 spCheckerTasks <- t
1564                         }()
1565
1566                 case SPTypeDone:
1567                         lesp := append(les, LE{"Type", "done"})
1568                         state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string {
1569                                 return fmt.Sprintf(
1570                                         "SP with %s (nice %s): unmarshaling DONE",
1571                                         state.Node.Name, NicenessFmt(state.Nice),
1572                                 )
1573                         })
1574                         var done SPDone
1575                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1576                                 state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string {
1577                                         return fmt.Sprintf(
1578                                                 "SP with %s (nice %s): unmarshaling DONE",
1579                                                 state.Node.Name, NicenessFmt(state.Nice),
1580                                         )
1581                                 })
1582                                 return nil, err
1583                         }
1584                         pktName := Base32Codec.EncodeToString(done.Hash[:])
1585                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)})
1586                         logMsg := func(les LEs) string {
1587                                 return fmt.Sprintf(
1588                                         "SP with %s (nice %s): DONE: removing %s",
1589                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1590                                 )
1591                         }
1592                         state.Ctx.LogD("sp-done", lesp, logMsg)
1593                         pth := filepath.Join(
1594                                 state.Ctx.Spool,
1595                                 state.Node.Id.String(),
1596                                 string(TTx),
1597                                 pktName,
1598                         )
1599                         if err = os.Remove(pth); err == nil {
1600                                 state.Ctx.LogI("sp-done", lesp, func(les LEs) string {
1601                                         return fmt.Sprintf("Packet %s is sent", pktName)
1602                                 })
1603                                 if state.Ctx.HdrUsage {
1604                                         os.Remove(JobPath2Hdr(pth))
1605                                 }
1606                         } else {
1607                                 state.Ctx.LogE("sp-done", lesp, err, logMsg)
1608                         }
1609
1610                 case SPTypeFreq:
1611                         lesp := append(les, LE{"Type", "freq"})
1612                         state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string {
1613                                 return fmt.Sprintf(
1614                                         "SP with %s (nice %s): unmarshaling FREQ",
1615                                         state.Node.Name, NicenessFmt(state.Nice),
1616                                 )
1617                         })
1618                         var freq SPFreq
1619                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1620                                 state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string {
1621                                         return fmt.Sprintf(
1622                                                 "SP with %s (nice %s): unmarshaling FREQ",
1623                                                 state.Node.Name, NicenessFmt(state.Nice),
1624                                         )
1625                                 })
1626                                 return nil, err
1627                         }
1628                         pktName := Base32Codec.EncodeToString(freq.Hash[:])
1629                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset})
1630                         state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string {
1631                                 return fmt.Sprintf(
1632                                         "SP with %s (nice %s): FREQ %s: queuing",
1633                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1634                                 )
1635                         })
1636                         nice, exists := state.infosOurSeen[*freq.Hash]
1637                         if exists {
1638                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1639                                         state.Lock()
1640                                         insertIdx := 0
1641                                         var freqWithNice *FreqWithNice
1642                                         for insertIdx, freqWithNice = range state.queueTheir {
1643                                                 if freqWithNice.nice > nice {
1644                                                         break
1645                                                 }
1646                                         }
1647                                         state.queueTheir = append(state.queueTheir, nil)
1648                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1649                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1650                                         state.Unlock()
1651                                 } else {
1652                                         state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string {
1653                                                 return fmt.Sprintf(
1654                                                         "SP with %s (nice %s): FREQ %s: skipping",
1655                                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1656                                                 )
1657                                         })
1658                                 }
1659                         } else {
1660                                 state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string {
1661                                         return fmt.Sprintf(
1662                                                 "SP with %s (nice %s): FREQ %s: unknown",
1663                                                 state.Node.Name, NicenessFmt(state.Nice), pktName,
1664                                         )
1665                                 })
1666                         }
1667
1668                 default:
1669                         state.Ctx.LogE(
1670                                 "sp-process-type-unknown",
1671                                 append(les, LE{"Type", head.Type}),
1672                                 errors.New("unknown type"),
1673                                 func(les LEs) string {
1674                                         return fmt.Sprintf(
1675                                                 "SP with %s (nice %s): %d",
1676                                                 state.Node.Name, NicenessFmt(state.Nice), head.Type,
1677                                         )
1678                                 },
1679                         )
1680                         return nil, BadPktType
1681                 }
1682         }
1683
1684         if infosGot {
1685                 var pkts int
1686                 var size uint64
1687                 state.RLock()
1688                 for _, info := range state.infosTheir {
1689                         pkts++
1690                         size += info.Size
1691                 }
1692                 state.RUnlock()
1693                 state.Ctx.LogI("sp-infos-rx", LEs{
1694                         {"XX", string(TRx)},
1695                         {"Node", state.Node.Id},
1696                         {"Pkts", pkts},
1697                         {"Size", int64(size)},
1698                 }, func(les LEs) string {
1699                         return fmt.Sprintf(
1700                                 "%s has got for us: %d packets, %s",
1701                                 state.Node.Name, pkts, humanize.IBytes(size),
1702                         )
1703                 })
1704         }
1705         return payloadsSplit(replies), nil
1706 }
1707
1708 func SPChecker(ctx *Ctx) {
1709         for t := range spCheckerTasks {
1710                 pktName := Base32Codec.EncodeToString(t.hsh[:])
1711                 les := LEs{
1712                         {"XX", string(TRx)},
1713                         {"Node", t.nodeId},
1714                         {"Pkt", pktName},
1715                 }
1716                 SPCheckerWg.Add(1)
1717                 ctx.LogD("sp-checker", les, func(les LEs) string {
1718                         return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(t.nodeId), pktName)
1719                 })
1720                 size, err := ctx.CheckNoCK(t.nodeId, t.hsh, t.mth)
1721                 les = append(les, LE{"Size", size})
1722                 if err != nil {
1723                         ctx.LogE("sp-checker", les, err, func(les LEs) string {
1724                                 return fmt.Sprintf(
1725                                         "Checksumming %s/rx/%s (%s)", ctx.NodeName(t.nodeId), pktName,
1726                                         humanize.IBytes(uint64(size)),
1727                                 )
1728                         })
1729                         SPCheckerWg.Done()
1730                         continue
1731                 }
1732                 ctx.LogI("sp-checker-done", les, func(les LEs) string {
1733                         return fmt.Sprintf(
1734                                 "Packet %s is retreived (%s)",
1735                                 pktName, humanize.IBytes(uint64(size)),
1736                         )
1737                 })
1738                 SPCheckerWg.Done()
1739                 go func(t SPCheckerTask) {
1740                         defer func() { recover() }()
1741                         t.done <- MarshalSP(SPTypeDone, SPDone{t.hsh})
1742                 }(t)
1743         }
1744 }