]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
Bad exit code if queues are not empty
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "fmt"
25         "io"
26         "log"
27         "os"
28         "path/filepath"
29         "sort"
30         "strconv"
31         "sync"
32         "time"
33
34         xdr "github.com/davecgh/go-xdr/xdr2"
35         "github.com/dustin/go-humanize"
36         "github.com/flynn/noise"
37 )
38
39 const (
40         MaxSPSize      = 1<<16 - 256
41         PartSuffix     = ".part"
42         SPHeadOverhead = 4
43         CfgDeadline    = "NNCPDEADLINE"
44 )
45
46 type MTHAndOffset struct {
47         mth    MTH
48         offset uint64
49 }
50
51 type SPCheckerTask struct {
52         nodeId *NodeId
53         hsh    *[MTHSize]byte
54         mth    MTH
55         done   chan []byte
56 }
57
58 var (
59         SPInfoOverhead    int
60         SPFreqOverhead    int
61         SPFileOverhead    int
62         SPHaltMarshalized []byte
63         SPPingMarshalized []byte
64
65         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
66                 noise.DH25519,
67                 noise.CipherChaChaPoly,
68                 noise.HashBLAKE2b,
69         )
70
71         DefaultDeadline = 10 * time.Second
72         PingTimeout     = time.Minute
73
74         spCheckerTasks chan SPCheckerTask
75         SPCheckerWg    sync.WaitGroup
76         spCheckerOnce  sync.Once
77 )
78
79 type FdAndFullSize struct {
80         fd       *os.File
81         fullSize int64
82 }
83
84 type SPType uint8
85
86 const (
87         SPTypeInfo SPType = iota
88         SPTypeFreq SPType = iota
89         SPTypeFile SPType = iota
90         SPTypeDone SPType = iota
91         SPTypeHalt SPType = iota
92         SPTypePing SPType = iota
93 )
94
95 type SPHead struct {
96         Type SPType
97 }
98
99 type SPInfo struct {
100         Nice uint8
101         Size uint64
102         Hash *[MTHSize]byte
103 }
104
105 type SPFreq struct {
106         Hash   *[MTHSize]byte
107         Offset uint64
108 }
109
110 type SPFile struct {
111         Hash    *[MTHSize]byte
112         Offset  uint64
113         Payload []byte
114 }
115
116 type SPDone struct {
117         Hash *[MTHSize]byte
118 }
119
120 type SPRaw struct {
121         Magic   [8]byte
122         Payload []byte
123 }
124
125 type FreqWithNice struct {
126         freq *SPFreq
127         nice uint8
128 }
129
130 type ConnDeadlined interface {
131         io.ReadWriteCloser
132         SetReadDeadline(t time.Time) error
133         SetWriteDeadline(t time.Time) error
134 }
135
136 func init() {
137         if v := os.Getenv(CfgDeadline); v != "" {
138                 i, err := strconv.Atoi(v)
139                 if err != nil {
140                         log.Fatalln("Can not convert", CfgDeadline, "to integer:", err)
141                 }
142                 DefaultDeadline = time.Duration(i) * time.Second
143         }
144
145         var buf bytes.Buffer
146         spHead := SPHead{Type: SPTypeHalt}
147         if _, err := xdr.Marshal(&buf, spHead); err != nil {
148                 panic(err)
149         }
150         SPHaltMarshalized = make([]byte, SPHeadOverhead)
151         copy(SPHaltMarshalized, buf.Bytes())
152         buf.Reset()
153
154         spHead = SPHead{Type: SPTypePing}
155         if _, err := xdr.Marshal(&buf, spHead); err != nil {
156                 panic(err)
157         }
158         SPPingMarshalized = make([]byte, SPHeadOverhead)
159         copy(SPPingMarshalized, buf.Bytes())
160         buf.Reset()
161
162         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([MTHSize]byte)}
163         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
164                 panic(err)
165         }
166         SPInfoOverhead = buf.Len()
167         buf.Reset()
168
169         spFreq := SPFreq{Hash: new([MTHSize]byte), Offset: 123}
170         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
171                 panic(err)
172         }
173         SPFreqOverhead = buf.Len()
174         buf.Reset()
175
176         spFile := SPFile{Hash: new([MTHSize]byte), Offset: 123}
177         if _, err := xdr.Marshal(&buf, spFile); err != nil {
178                 panic(err)
179         }
180         SPFileOverhead = buf.Len()
181         spCheckerTasks = make(chan SPCheckerTask)
182 }
183
184 func MarshalSP(typ SPType, sp interface{}) []byte {
185         var buf bytes.Buffer
186         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
187                 panic(err)
188         }
189         if _, err := xdr.Marshal(&buf, sp); err != nil {
190                 panic(err)
191         }
192         return buf.Bytes()
193 }
194
195 func payloadsSplit(payloads [][]byte) [][]byte {
196         var outbounds [][]byte
197         outbound := make([]byte, 0, MaxSPSize)
198         for i, payload := range payloads {
199                 outbound = append(outbound, payload...)
200                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
201                         outbounds = append(outbounds, outbound)
202                         outbound = make([]byte, 0, MaxSPSize)
203                 }
204         }
205         if len(outbound) > 0 {
206                 outbounds = append(outbounds, outbound)
207         }
208         return outbounds
209 }
210
211 type SPState struct {
212         Ctx            *Ctx
213         Node           *Node
214         Nice           uint8
215         NoCK           bool
216         onlineDeadline time.Duration
217         maxOnlineTime  time.Duration
218         hs             *noise.HandshakeState
219         csOur          *noise.CipherState
220         csTheir        *noise.CipherState
221         payloads       chan []byte
222         pings          chan struct{}
223         infosTheir     map[[MTHSize]byte]*SPInfo
224         infosOurSeen   map[[MTHSize]byte]uint8
225         queueTheir     []*FreqWithNice
226         wg             sync.WaitGroup
227         RxBytes        int64
228         RxLastSeen     time.Time
229         RxLastNonPing  time.Time
230         TxBytes        int64
231         TxLastSeen     time.Time
232         TxLastNonPing  time.Time
233         started        time.Time
234         mustFinishAt   time.Time
235         Duration       time.Duration
236         RxSpeed        int64
237         TxSpeed        int64
238         rxLock         *os.File
239         txLock         *os.File
240         xxOnly         TRxTx
241         rxRate         int
242         txRate         int
243         isDead         chan struct{}
244         listOnly       bool
245         onlyPkts       map[[MTHSize]byte]bool
246         writeSPBuf     bytes.Buffer
247         fds            map[string]FdAndFullSize
248         fdsLock        sync.RWMutex
249         fileHashers    map[string]*MTHAndOffset
250         progressBars   map[string]struct{}
251         sync.RWMutex
252 }
253
254 func (state *SPState) SetDead() {
255         state.Lock()
256         defer state.Unlock()
257         select {
258         case <-state.isDead:
259                 // Already closed channel, dead
260                 return
261         default:
262         }
263         close(state.isDead)
264         go func() {
265                 for range state.payloads {
266                 }
267         }()
268         go func() {
269                 for range state.pings {
270                 }
271         }()
272 }
273
274 func (state *SPState) NotAlive() bool {
275         select {
276         case <-state.isDead:
277                 return true
278         default:
279         }
280         return false
281 }
282
283 func (state *SPState) dirUnlock() {
284         state.Ctx.UnlockDir(state.rxLock)
285         state.Ctx.UnlockDir(state.txLock)
286 }
287
288 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
289         state.writeSPBuf.Reset()
290         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
291                 Magic:   MagicNNCPSv1.B,
292                 Payload: payload,
293         })
294         if err != nil {
295                 return err
296         }
297         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
298                 state.TxLastSeen = time.Now()
299                 state.TxBytes += int64(n)
300                 if !ping {
301                         state.TxLastNonPing = state.TxLastSeen
302                 }
303         }
304         return err
305 }
306
307 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
308         var sp SPRaw
309         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
310         if err != nil {
311                 ue := err.(*xdr.UnmarshalError)
312                 if ue.Err == io.EOF {
313                         return nil, ue.Err
314                 }
315                 return nil, err
316         }
317         state.RxLastSeen = time.Now()
318         state.RxBytes += int64(n)
319         if sp.Magic != MagicNNCPSv1.B {
320                 return nil, BadMagic
321         }
322         return sp.Payload, nil
323 }
324
325 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[MTHSize]byte]uint8) [][]byte {
326         var infos []*SPInfo
327         var totalSize int64
328         for job := range ctx.Jobs(nodeId, TTx) {
329                 if job.PktEnc.Nice > nice {
330                         continue
331                 }
332                 if _, known := (*seen)[*job.HshValue]; known {
333                         continue
334                 }
335                 totalSize += job.Size
336                 infos = append(infos, &SPInfo{
337                         Nice: job.PktEnc.Nice,
338                         Size: uint64(job.Size),
339                         Hash: job.HshValue,
340                 })
341                 (*seen)[*job.HshValue] = job.PktEnc.Nice
342         }
343         sort.Sort(ByNice(infos))
344         var payloads [][]byte
345         for _, info := range infos {
346                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
347                 pktName := Base32Codec.EncodeToString(info.Hash[:])
348                 ctx.LogD("sp-info-our", LEs{
349                         {"Node", nodeId},
350                         {"Name", pktName},
351                         {"Size", info.Size},
352                 }, func(les LEs) string {
353                         return fmt.Sprintf(
354                                 "Our info: %s/tx/%s (%s)",
355                                 ctx.NodeName(nodeId),
356                                 pktName,
357                                 humanize.IBytes(info.Size),
358                         )
359                 })
360         }
361         if totalSize > 0 {
362                 ctx.LogI("sp-infos-tx", LEs{
363                         {"XX", string(TTx)},
364                         {"Node", nodeId},
365                         {"Pkts", len(payloads)},
366                         {"Size", totalSize},
367                 }, func(les LEs) string {
368                         return fmt.Sprintf(
369                                 "We have got for %s: %d packets, %s",
370                                 ctx.NodeName(nodeId),
371                                 len(payloads),
372                                 humanize.IBytes(uint64(totalSize)),
373                         )
374                 })
375         }
376         return payloadsSplit(payloads)
377 }
378
379 func (state *SPState) StartI(conn ConnDeadlined) error {
380         nodeId := state.Node.Id
381         err := state.Ctx.ensureRxDir(nodeId)
382         if err != nil {
383                 return err
384         }
385         var rxLock *os.File
386         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
387                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
388                 if err != nil {
389                         return err
390                 }
391         }
392         var txLock *os.File
393         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
394                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
395                 if err != nil {
396                         return err
397                 }
398         }
399         started := time.Now()
400         conf := noise.Config{
401                 CipherSuite: NoiseCipherSuite,
402                 Pattern:     noise.HandshakeIK,
403                 Initiator:   true,
404                 StaticKeypair: noise.DHKey{
405                         Private: state.Ctx.Self.NoisePrv[:],
406                         Public:  state.Ctx.Self.NoisePub[:],
407                 },
408                 PeerStatic: state.Node.NoisePub[:],
409         }
410         hs, err := noise.NewHandshakeState(conf)
411         if err != nil {
412                 return err
413         }
414         state.hs = hs
415         state.payloads = make(chan []byte)
416         state.pings = make(chan struct{})
417         state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
418         state.infosOurSeen = make(map[[MTHSize]byte]uint8)
419         state.progressBars = make(map[string]struct{})
420         state.started = started
421         state.rxLock = rxLock
422         state.txLock = txLock
423
424         var infosPayloads [][]byte
425         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
426                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
427         }
428         var firstPayload []byte
429         if len(infosPayloads) > 0 {
430                 firstPayload = infosPayloads[0]
431         }
432         // Pad first payload, to hide actual number of existing files
433         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
434                 firstPayload = append(firstPayload, SPHaltMarshalized...)
435         }
436
437         var buf []byte
438         var payload []byte
439         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
440         if err != nil {
441                 state.dirUnlock()
442                 return err
443         }
444         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
445         state.Ctx.LogD("sp-startI", les, func(les LEs) string {
446                 return fmt.Sprintf(
447                         "SP with %s (nice %s): sending first message",
448                         state.Node.Name,
449                         NicenessFmt(state.Nice),
450                 )
451         })
452         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
453         if err = state.WriteSP(conn, buf, false); err != nil {
454                 state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
455                         return fmt.Sprintf(
456                                 "SP with %s (nice %s): writing",
457                                 state.Node.Name,
458                                 NicenessFmt(state.Nice),
459                         )
460                 })
461                 state.dirUnlock()
462                 return err
463         }
464         state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string {
465                 return fmt.Sprintf(
466                         "SP with %s (nice %s): waiting for first message",
467                         state.Node.Name,
468                         NicenessFmt(state.Nice),
469                 )
470         })
471         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
472         if buf, err = state.ReadSP(conn); err != nil {
473                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
474                         return fmt.Sprintf(
475                                 "SP with %s (nice %s): reading",
476                                 state.Node.Name,
477                                 NicenessFmt(state.Nice),
478                         )
479                 })
480                 state.dirUnlock()
481                 return err
482         }
483         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
484         if err != nil {
485                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
486                         return fmt.Sprintf(
487                                 "SP with %s (nice %s): reading Noise message",
488                                 state.Node.Name,
489                                 NicenessFmt(state.Nice),
490                         )
491                 })
492                 state.dirUnlock()
493                 return err
494         }
495         state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string {
496                 return fmt.Sprintf(
497                         "SP with %s (nice %s): starting workers",
498                         state.Node.Name,
499                         NicenessFmt(state.Nice),
500                 )
501         })
502         err = state.StartWorkers(conn, infosPayloads, payload)
503         if err != nil {
504                 state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string {
505                         return fmt.Sprintf(
506                                 "SP with %s (nice %s): starting workers",
507                                 state.Node.Name,
508                                 NicenessFmt(state.Nice),
509                         )
510                 })
511                 state.dirUnlock()
512         }
513         return err
514 }
515
516 func (state *SPState) StartR(conn ConnDeadlined) error {
517         started := time.Now()
518         conf := noise.Config{
519                 CipherSuite: NoiseCipherSuite,
520                 Pattern:     noise.HandshakeIK,
521                 Initiator:   false,
522                 StaticKeypair: noise.DHKey{
523                         Private: state.Ctx.Self.NoisePrv[:],
524                         Public:  state.Ctx.Self.NoisePub[:],
525                 },
526         }
527         hs, err := noise.NewHandshakeState(conf)
528         if err != nil {
529                 return err
530         }
531         xxOnly := TRxTx("")
532         state.hs = hs
533         state.payloads = make(chan []byte)
534         state.pings = make(chan struct{})
535         state.infosOurSeen = make(map[[MTHSize]byte]uint8)
536         state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
537         state.progressBars = make(map[string]struct{})
538         state.started = started
539         state.xxOnly = xxOnly
540
541         var buf []byte
542         var payload []byte
543         logMsg := func(les LEs) string {
544                 return fmt.Sprintf(
545                         "SP nice %s: waiting for first message",
546                         NicenessFmt(state.Nice),
547                 )
548         }
549         les := LEs{{"Nice", int(state.Nice)}}
550         state.Ctx.LogD("sp-startR", les, logMsg)
551         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
552         if buf, err = state.ReadSP(conn); err != nil {
553                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
554                 return err
555         }
556         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
557                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
558                 return err
559         }
560
561         var node *Node
562         for _, n := range state.Ctx.Neigh {
563                 if n.NoisePub == nil {
564                         continue
565                 }
566                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
567                         node = n
568                         break
569                 }
570         }
571         if node == nil {
572                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
573                 err = errors.New("unknown peer: " + peerId)
574                 state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg)
575                 return err
576         }
577         state.Node = node
578         state.rxRate = node.RxRate
579         state.txRate = node.TxRate
580         state.onlineDeadline = node.OnlineDeadline
581         state.maxOnlineTime = node.MaxOnlineTime
582         les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
583
584         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
585                 return err
586         }
587         var rxLock *os.File
588         if xxOnly == "" || xxOnly == TRx {
589                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
590                 if err != nil {
591                         return err
592                 }
593         }
594         state.rxLock = rxLock
595         var txLock *os.File
596         if xxOnly == "" || xxOnly == TTx {
597                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
598                 if err != nil {
599                         return err
600                 }
601         }
602         state.txLock = txLock
603
604         var infosPayloads [][]byte
605         if xxOnly == "" || xxOnly == TTx {
606                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
607         }
608         var firstPayload []byte
609         if len(infosPayloads) > 0 {
610                 firstPayload = infosPayloads[0]
611         }
612         // Pad first payload, to hide actual number of existing files
613         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
614                 firstPayload = append(firstPayload, SPHaltMarshalized...)
615         }
616
617         state.Ctx.LogD("sp-startR-write", les, func(les LEs) string {
618                 return fmt.Sprintf(
619                         "SP with %s (nice %s): sending first message",
620                         node.Name, NicenessFmt(state.Nice),
621                 )
622         })
623         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
624         if err != nil {
625                 state.dirUnlock()
626                 return err
627         }
628         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
629         if err = state.WriteSP(conn, buf, false); err != nil {
630                 state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
631                         return fmt.Sprintf(
632                                 "SP with %s (nice %s): writing",
633                                 node.Name, NicenessFmt(state.Nice),
634                         )
635                 })
636                 state.dirUnlock()
637                 return err
638         }
639         state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string {
640                 return fmt.Sprintf(
641                         "SP with %s (nice %s): starting workers",
642                         node.Name, NicenessFmt(state.Nice),
643                 )
644         })
645         err = state.StartWorkers(conn, infosPayloads, payload)
646         if err != nil {
647                 state.dirUnlock()
648         }
649         return err
650 }
651
652 func (state *SPState) closeFd(pth string) {
653         state.fdsLock.Lock()
654         if s, exists := state.fds[pth]; exists {
655                 delete(state.fds, pth)
656                 s.fd.Close()
657         }
658         state.fdsLock.Unlock()
659 }
660
661 func (state *SPState) StartWorkers(
662         conn ConnDeadlined,
663         infosPayloads [][]byte,
664         payload []byte,
665 ) error {
666         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
667         state.fds = make(map[string]FdAndFullSize)
668         state.fileHashers = make(map[string]*MTHAndOffset)
669         state.isDead = make(chan struct{})
670         if state.maxOnlineTime > 0 {
671                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
672         }
673         if !state.NoCK {
674                 spCheckerOnce.Do(func() { go SPChecker(state.Ctx) })
675                 go func() {
676                         for job := range state.Ctx.JobsNoCK(state.Node.Id) {
677                                 if job.PktEnc.Nice <= state.Nice {
678                                         spCheckerTasks <- SPCheckerTask{
679                                                 nodeId: state.Node.Id,
680                                                 hsh:    job.HshValue,
681                                                 done:   state.payloads,
682                                         }
683                                 }
684                         }
685                 }()
686         }
687
688         // Remaining handshake payload sending
689         if len(infosPayloads) > 1 {
690                 state.wg.Add(1)
691                 go func() {
692                         for _, payload := range infosPayloads[1:] {
693                                 state.Ctx.LogD(
694                                         "sp-queue-remaining",
695                                         append(les, LE{"Size", int64(len(payload))}),
696                                         func(les LEs) string {
697                                                 return fmt.Sprintf(
698                                                         "SP with %s (nice %s): queuing remaining payload (%s)",
699                                                         state.Node.Name, NicenessFmt(state.Nice),
700                                                         humanize.IBytes(uint64(len(payload))),
701                                                 )
702                                         },
703                                 )
704                                 state.payloads <- payload
705                         }
706                         state.wg.Done()
707                 }()
708         }
709
710         // Processing of first payload and queueing its responses
711         logMsg := func(les LEs) string {
712                 return fmt.Sprintf(
713                         "SP with %s (nice %s): processing first payload (%s)",
714                         state.Node.Name, NicenessFmt(state.Nice),
715                         humanize.IBytes(uint64(len(payload))),
716                 )
717         }
718         state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
719         replies, err := state.ProcessSP(payload)
720         if err != nil {
721                 state.Ctx.LogE("sp-process", les, err, logMsg)
722                 return err
723         }
724         state.wg.Add(1)
725         go func() {
726                 for _, reply := range replies {
727                         state.Ctx.LogD(
728                                 "sp-queue-reply",
729                                 append(les, LE{"Size", int64(len(reply))}),
730                                 func(les LEs) string {
731                                         return fmt.Sprintf(
732                                                 "SP with %s (nice %s): queuing reply (%s)",
733                                                 state.Node.Name, NicenessFmt(state.Nice),
734                                                 humanize.IBytes(uint64(len(payload))),
735                                         )
736                                 },
737                         )
738                         state.payloads <- reply
739                 }
740                 state.wg.Done()
741         }()
742
743         // Periodic jobs
744         state.wg.Add(1)
745         go func() {
746                 deadlineTicker := time.NewTicker(time.Second)
747                 pingTicker := time.NewTicker(PingTimeout)
748                 for {
749                         select {
750                         case <-state.isDead:
751                                 state.wg.Done()
752                                 deadlineTicker.Stop()
753                                 pingTicker.Stop()
754                                 return
755                         case now := <-deadlineTicker.C:
756                                 if now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
757                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline {
758                                         goto Deadlined
759                                 }
760                                 if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) {
761                                         goto Deadlined
762                                 }
763                                 if now.Sub(state.RxLastSeen) >= 2*PingTimeout {
764                                         goto Deadlined
765                                 }
766                                 break
767                         Deadlined:
768                                 state.SetDead()
769                                 conn.Close()
770                         case now := <-pingTicker.C:
771                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
772                                         state.wg.Add(1)
773                                         go func() {
774                                                 state.pings <- struct{}{}
775                                                 state.wg.Done()
776                                         }()
777                                 }
778                         }
779                 }
780         }()
781
782         // Spool checker and INFOs sender of appearing files
783         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
784                 state.wg.Add(1)
785                 go func() {
786                         dw, err := state.Ctx.NewDirWatcher(
787                                 filepath.Join(state.Ctx.Spool, state.Node.Id.String(), string(TTx)),
788                                 time.Second,
789                         )
790                         if err != nil {
791                                 state.Ctx.LogE("sp-queue-dir-watch", les, err, logMsg)
792                                 log.Fatalln(err)
793                         }
794                         for {
795                                 select {
796                                 case <-state.isDead:
797                                         dw.Close()
798                                         state.wg.Done()
799                                         return
800                                 case <-dw.C:
801                                         for _, payload := range state.Ctx.infosOur(
802                                                 state.Node.Id,
803                                                 state.Nice,
804                                                 &state.infosOurSeen,
805                                         ) {
806                                                 state.Ctx.LogD(
807                                                         "sp-queue-info",
808                                                         append(les, LE{"Size", int64(len(payload))}),
809                                                         func(les LEs) string {
810                                                                 return fmt.Sprintf(
811                                                                         "SP with %s (nice %s): queuing new info (%s)",
812                                                                         state.Node.Name, NicenessFmt(state.Nice),
813                                                                         humanize.IBytes(uint64(len(payload))),
814                                                                 )
815                                                         },
816                                                 )
817                                                 state.payloads <- payload
818                                         }
819                                 }
820                         }
821                 }()
822         }
823
824         // Sender
825         state.wg.Add(1)
826         go func() {
827                 defer conn.Close()
828                 defer state.SetDead()
829                 defer state.wg.Done()
830                 buf := make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
831                 for {
832                         if state.NotAlive() {
833                                 return
834                         }
835                         var payload []byte
836                         var ping bool
837                         select {
838                         case <-state.pings:
839                                 state.Ctx.LogD("sp-got-ping", les, func(les LEs) string {
840                                         return fmt.Sprintf(
841                                                 "SP with %s (nice %s): got ping",
842                                                 state.Node.Name, NicenessFmt(state.Nice),
843                                         )
844                                 })
845                                 payload = SPPingMarshalized
846                                 ping = true
847                         case payload = <-state.payloads:
848                                 state.Ctx.LogD(
849                                         "sp-got-payload",
850                                         append(les, LE{"Size", int64(len(payload))}),
851                                         func(les LEs) string {
852                                                 return fmt.Sprintf(
853                                                         "SP with %s (nice %s): got payload (%s)",
854                                                         state.Node.Name, NicenessFmt(state.Nice),
855                                                         humanize.IBytes(uint64(len(payload))),
856                                                 )
857                                         },
858                                 )
859                         default:
860                                 state.RLock()
861                                 if len(state.queueTheir) == 0 {
862                                         state.RUnlock()
863                                         time.Sleep(100 * time.Millisecond)
864                                         continue
865                                 }
866                                 freq := state.queueTheir[0].freq
867                                 state.RUnlock()
868                                 if state.txRate > 0 {
869                                         time.Sleep(time.Second / time.Duration(state.txRate))
870                                 }
871                                 pktName := Base32Codec.EncodeToString(freq.Hash[:])
872                                 lesp := append(
873                                         les,
874                                         LE{"XX", string(TTx)},
875                                         LE{"Pkt", pktName},
876                                         LE{"Size", int64(freq.Offset)},
877                                 )
878                                 logMsg := func(les LEs) string {
879                                         return fmt.Sprintf(
880                                                 "SP with %s (nice %s): tx/%s (%s)",
881                                                 state.Node.Name, NicenessFmt(state.Nice),
882                                                 pktName,
883                                                 humanize.IBytes(freq.Offset),
884                                         )
885                                 }
886                                 state.Ctx.LogD("sp-queue", lesp, func(les LEs) string {
887                                         return logMsg(les) + ": queueing"
888                                 })
889                                 pth := filepath.Join(
890                                         state.Ctx.Spool,
891                                         state.Node.Id.String(),
892                                         string(TTx),
893                                         Base32Codec.EncodeToString(freq.Hash[:]),
894                                 )
895                                 state.fdsLock.RLock()
896                                 fdAndFullSize, exists := state.fds[pth]
897                                 state.fdsLock.RUnlock()
898                                 if !exists {
899                                         state.Ctx.LogD("sp-queue-open", lesp, func(les LEs) string {
900                                                 return logMsg(les) + ": opening"
901                                         })
902                                         fd, err := os.Open(pth)
903                                         if err != nil {
904                                                 state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
905                                                         return logMsg(les) + ": opening"
906                                                 })
907                                                 return
908                                         }
909                                         fi, err := fd.Stat()
910                                         if err != nil {
911                                                 state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string {
912                                                         return logMsg(les) + ": stating"
913                                                 })
914                                                 return
915                                         }
916                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
917                                         state.fdsLock.Lock()
918                                         state.fds[pth] = fdAndFullSize
919                                         state.fdsLock.Unlock()
920                                 }
921                                 fd := fdAndFullSize.fd
922                                 fullSize := fdAndFullSize.fullSize
923                                 lesp = append(lesp, LE{"FullSize", fullSize})
924                                 var bufRead []byte
925                                 if freq.Offset < uint64(fullSize) {
926                                         state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
927                                                 return logMsg(les) + ": seeking"
928                                         })
929                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
930                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
931                                                         return logMsg(les) + ": seeking"
932                                                 })
933                                                 return
934                                         }
935                                         n, err := fd.Read(buf)
936                                         if err != nil {
937                                                 state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
938                                                         return logMsg(les) + ": reading"
939                                                 })
940                                                 return
941                                         }
942                                         bufRead = buf[:n]
943                                         lesp = append(
944                                                 les,
945                                                 LE{"XX", string(TTx)},
946                                                 LE{"Pkt", pktName},
947                                                 LE{"Size", int64(n)},
948                                                 LE{"FullSize", fullSize},
949                                         )
950                                         state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
951                                                 return fmt.Sprintf(
952                                                         "%s: read %s",
953                                                         logMsg(les), humanize.IBytes(uint64(n)),
954                                                 )
955                                         })
956                                 } else {
957                                         state.closeFd(pth)
958                                 }
959                                 payload = MarshalSP(SPTypeFile, SPFile{
960                                         Hash:    freq.Hash,
961                                         Offset:  freq.Offset,
962                                         Payload: bufRead,
963                                 })
964                                 ourSize := freq.Offset + uint64(len(bufRead))
965                                 lesp = append(
966                                         les,
967                                         LE{"XX", string(TTx)},
968                                         LE{"Pkt", pktName},
969                                         LE{"Size", int64(ourSize)},
970                                         LE{"FullSize", fullSize},
971                                 )
972                                 if state.Ctx.ShowPrgrs {
973                                         state.progressBars[pktName] = struct{}{}
974                                         Progress("Tx", lesp)
975                                 }
976                                 if ourSize == uint64(fullSize) {
977                                         state.closeFd(pth)
978                                         state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
979                                                 return logMsg(les) + ": finished"
980                                         })
981                                         if state.Ctx.ShowPrgrs {
982                                                 delete(state.progressBars, pktName)
983                                         }
984                                 }
985                                 state.Lock()
986                                 for i, q := range state.queueTheir {
987                                         if *q.freq.Hash != *freq.Hash {
988                                                 continue
989                                         }
990                                         if ourSize == uint64(fullSize) {
991                                                 state.queueTheir = append(
992                                                         state.queueTheir[:i],
993                                                         state.queueTheir[i+1:]...,
994                                                 )
995                                         } else {
996                                                 q.freq.Offset = ourSize
997                                         }
998                                         break
999                                 }
1000                                 state.Unlock()
1001                         }
1002                         logMsg := func(les LEs) string {
1003                                 return fmt.Sprintf(
1004                                         "SP with %s (nice %s): sending %s",
1005                                         state.Node.Name, NicenessFmt(state.Nice),
1006                                         humanize.IBytes(uint64(len(payload))),
1007                                 )
1008                         }
1009                         state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
1010                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
1011                         ct, err := state.csOur.Encrypt(nil, nil, payload)
1012                         if err != nil {
1013                                 state.Ctx.LogE("sp-encrypting", les, err, logMsg)
1014                                 return
1015                         }
1016                         if err := state.WriteSP(conn, ct, ping); err != nil {
1017                                 state.Ctx.LogE("sp-sending", les, err, logMsg)
1018                                 return
1019                         }
1020                 }
1021         }()
1022
1023         // Receiver
1024         state.wg.Add(1)
1025         go func() {
1026                 for {
1027                         if state.NotAlive() {
1028                                 break
1029                         }
1030                         logMsg := func(les LEs) string {
1031                                 return fmt.Sprintf(
1032                                         "SP with %s (nice %s): waiting for payload",
1033                                         state.Node.Name, NicenessFmt(state.Nice),
1034                                 )
1035                         }
1036                         state.Ctx.LogD("sp-recv-wait", les, logMsg)
1037                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
1038                         payload, err := state.ReadSP(conn)
1039                         if err != nil {
1040                                 if err == io.EOF {
1041                                         break
1042                                 }
1043                                 unmarshalErr := err.(*xdr.UnmarshalError)
1044                                 if os.IsTimeout(unmarshalErr.Err) {
1045                                         continue
1046                                 }
1047                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
1048                                         break
1049                                 }
1050                                 state.Ctx.LogE("sp-recv-wait", les, err, logMsg)
1051                                 break
1052                         }
1053                         logMsg = func(les LEs) string {
1054                                 return fmt.Sprintf(
1055                                         "SP with %s (nice %s): payload (%s)",
1056                                         state.Node.Name, NicenessFmt(state.Nice),
1057                                         humanize.IBytes(uint64(len(payload))),
1058                                 )
1059                         }
1060                         state.Ctx.LogD(
1061                                 "sp-recv-got",
1062                                 append(les, LE{"Size", int64(len(payload))}),
1063                                 func(les LEs) string { return logMsg(les) + ": got" },
1064                         )
1065                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
1066                         if err != nil {
1067                                 state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string {
1068                                         return logMsg(les) + ": got"
1069                                 })
1070                                 break
1071                         }
1072                         state.Ctx.LogD(
1073                                 "sp-recv-process",
1074                                 append(les, LE{"Size", int64(len(payload))}),
1075                                 func(les LEs) string {
1076                                         return logMsg(les) + ": processing"
1077                                 },
1078                         )
1079                         replies, err := state.ProcessSP(payload)
1080                         if err != nil {
1081                                 state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string {
1082                                         return logMsg(les) + ": processing"
1083                                 })
1084                                 break
1085                         }
1086                         state.wg.Add(1)
1087                         go func() {
1088                                 for _, reply := range replies {
1089                                         state.Ctx.LogD(
1090                                                 "sp-recv-reply",
1091                                                 append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
1092                                                 func(les LEs) string {
1093                                                         return fmt.Sprintf(
1094                                                                 "SP with %s (nice %s): queuing reply (%s)",
1095                                                                 state.Node.Name, NicenessFmt(state.Nice),
1096                                                                 humanize.IBytes(uint64(len(reply))),
1097                                                         )
1098                                                 },
1099                                         )
1100                                         state.payloads <- reply
1101                                 }
1102                                 state.wg.Done()
1103                         }()
1104                         if state.rxRate > 0 {
1105                                 time.Sleep(time.Second / time.Duration(state.rxRate))
1106                         }
1107                 }
1108                 state.SetDead()
1109                 state.wg.Done()
1110                 state.SetDead()
1111                 conn.Close()
1112         }()
1113
1114         return nil
1115 }
1116
1117 func (state *SPState) Wait() bool {
1118         state.wg.Wait()
1119         close(state.payloads)
1120         close(state.pings)
1121         state.Duration = time.Now().Sub(state.started)
1122         state.dirUnlock()
1123         state.RxSpeed = state.RxBytes
1124         state.TxSpeed = state.TxBytes
1125         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
1126         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
1127         if rxDuration > 0 {
1128                 state.RxSpeed = state.RxBytes / rxDuration
1129         }
1130         if txDuration > 0 {
1131                 state.TxSpeed = state.TxBytes / txDuration
1132         }
1133         nothingLeft := len(state.queueTheir) == 0
1134         for _, s := range state.fds {
1135                 nothingLeft = false
1136                 s.fd.Close()
1137         }
1138         for pktName := range state.progressBars {
1139                 ProgressKill(pktName)
1140         }
1141         return nothingLeft
1142 }
1143
1144 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
1145         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
1146         r := bytes.NewReader(payload)
1147         var err error
1148         var replies [][]byte
1149         var infosGot bool
1150         for r.Len() > 0 {
1151                 state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string {
1152                         return fmt.Sprintf(
1153                                 "SP with %s (nice %s): unmarshaling header",
1154                                 state.Node.Name, NicenessFmt(state.Nice),
1155                         )
1156                 })
1157                 var head SPHead
1158                 if _, err = xdr.Unmarshal(r, &head); err != nil {
1159                         state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string {
1160                                 return fmt.Sprintf(
1161                                         "SP with %s (nice %s): unmarshaling header",
1162                                         state.Node.Name, NicenessFmt(state.Nice),
1163                                 )
1164                         })
1165                         return nil, err
1166                 }
1167                 if head.Type != SPTypePing {
1168                         state.RxLastNonPing = state.RxLastSeen
1169                 }
1170                 switch head.Type {
1171                 case SPTypeHalt:
1172                         state.Ctx.LogD(
1173                                 "sp-process-halt",
1174                                 append(les, LE{"Type", "halt"}), func(les LEs) string {
1175                                         return fmt.Sprintf(
1176                                                 "SP with %s (nice %s): got HALT",
1177                                                 state.Node.Name, NicenessFmt(state.Nice),
1178                                         )
1179                                 },
1180                         )
1181                         state.Lock()
1182                         state.queueTheir = nil
1183                         state.Unlock()
1184
1185                 case SPTypePing:
1186                         state.Ctx.LogD(
1187                                 "sp-process-ping",
1188                                 append(les, LE{"Type", "ping"}),
1189                                 func(les LEs) string {
1190                                         return fmt.Sprintf(
1191                                                 "SP with %s (nice %s): got PING",
1192                                                 state.Node.Name, NicenessFmt(state.Nice),
1193                                         )
1194                                 },
1195                         )
1196
1197                 case SPTypeInfo:
1198                         infosGot = true
1199                         lesp := append(les, LE{"Type", "info"})
1200                         state.Ctx.LogD(
1201                                 "sp-process-info-unmarshal", lesp,
1202                                 func(les LEs) string {
1203                                         return fmt.Sprintf(
1204                                                 "SP with %s (nice %s): unmarshaling INFO",
1205                                                 state.Node.Name, NicenessFmt(state.Nice),
1206                                         )
1207                                 },
1208                         )
1209                         var info SPInfo
1210                         if _, err = xdr.Unmarshal(r, &info); err != nil {
1211                                 state.Ctx.LogE(
1212                                         "sp-process-info-unmarshal", lesp, err,
1213                                         func(les LEs) string {
1214                                                 return fmt.Sprintf(
1215                                                         "SP with %s (nice %s): unmarshaling INFO",
1216                                                         state.Node.Name, NicenessFmt(state.Nice),
1217                                                 )
1218                                         },
1219                                 )
1220                                 return nil, err
1221                         }
1222                         pktName := Base32Codec.EncodeToString(info.Hash[:])
1223                         lesp = append(
1224                                 lesp,
1225                                 LE{"Pkt", pktName},
1226                                 LE{"Size", int64(info.Size)},
1227                                 LE{"PktNice", int(info.Nice)},
1228                         )
1229                         logMsg := func(les LEs) string {
1230                                 return fmt.Sprintf(
1231                                         "SP with %s (nice %s): INFO %s (%s) nice %s",
1232                                         state.Node.Name, NicenessFmt(state.Nice),
1233                                         pktName,
1234                                         humanize.IBytes(info.Size),
1235                                         NicenessFmt(info.Nice),
1236                                 )
1237                         }
1238                         if !state.listOnly && info.Nice > state.Nice {
1239                                 state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string {
1240                                         return logMsg(les) + ": too nice"
1241                                 })
1242                                 continue
1243                         }
1244                         state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string {
1245                                 return logMsg(les) + ": received"
1246                         })
1247                         if !state.listOnly && state.xxOnly == TTx {
1248                                 continue
1249                         }
1250                         state.Lock()
1251                         state.infosTheir[*info.Hash] = &info
1252                         state.Unlock()
1253                         state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string {
1254                                 return logMsg(les) + ": stating part"
1255                         })
1256                         pktPath := filepath.Join(
1257                                 state.Ctx.Spool,
1258                                 state.Node.Id.String(),
1259                                 string(TRx),
1260                                 Base32Codec.EncodeToString(info.Hash[:]),
1261                         )
1262                         logMsg = func(les LEs) string {
1263                                 return fmt.Sprintf(
1264                                         "Packet %s (%s) (nice %s)",
1265                                         pktName,
1266                                         humanize.IBytes(info.Size),
1267                                         NicenessFmt(info.Nice),
1268                                 )
1269                         }
1270                         if _, err = os.Stat(pktPath); err == nil {
1271                                 state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string {
1272                                         return logMsg(les) + ": already done"
1273                                 })
1274                                 if !state.listOnly {
1275                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1276                                 }
1277                                 continue
1278                         }
1279                         if _, err = os.Stat(filepath.Join(
1280                                 state.Ctx.Spool, state.Node.Id.String(), string(TRx),
1281                                 SeenDir, Base32Codec.EncodeToString(info.Hash[:]),
1282                         )); err == nil {
1283                                 state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
1284                                         return logMsg(les) + ": already seen"
1285                                 })
1286                                 if !state.listOnly {
1287                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1288                                 }
1289                                 continue
1290                         }
1291                         if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
1292                                 state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string {
1293                                         return logMsg(les) + ": still not checksummed"
1294                                 })
1295                                 continue
1296                         }
1297                         fi, err := os.Stat(pktPath + PartSuffix)
1298                         var offset int64
1299                         if err == nil {
1300                                 offset = fi.Size()
1301                         }
1302                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
1303                                 state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string {
1304                                         return logMsg(les) + ": not enough space"
1305                                 })
1306                                 continue
1307                         }
1308                         state.Ctx.LogI(
1309                                 "sp-info",
1310                                 append(lesp, LE{"Offset", offset}),
1311                                 func(les LEs) string {
1312                                         return fmt.Sprintf(
1313                                                 "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size,
1314                                         )
1315                                 },
1316                         )
1317                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1318                                 replies = append(replies, MarshalSP(
1319                                         SPTypeFreq,
1320                                         SPFreq{info.Hash, uint64(offset)},
1321                                 ))
1322                         }
1323
1324                 case SPTypeFile:
1325                         lesp := append(les, LE{"Type", "file"})
1326                         state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string {
1327                                 return fmt.Sprintf(
1328                                         "SP with %s (nice %s): unmarshaling FILE",
1329                                         state.Node.Name, NicenessFmt(state.Nice),
1330                                 )
1331                         })
1332                         var file SPFile
1333                         if _, err = xdr.Unmarshal(r, &file); err != nil {
1334                                 state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string {
1335                                         return fmt.Sprintf(
1336                                                 "SP with %s (nice %s): unmarshaling FILE",
1337                                                 state.Node.Name, NicenessFmt(state.Nice),
1338                                         )
1339                                 })
1340                                 return nil, err
1341                         }
1342                         pktName := Base32Codec.EncodeToString(file.Hash[:])
1343                         lesp = append(
1344                                 lesp,
1345                                 LE{"XX", string(TRx)},
1346                                 LE{"Pkt", pktName},
1347                                 LE{"Size", int64(len(file.Payload))},
1348                         )
1349                         logMsg := func(les LEs) string {
1350                                 return fmt.Sprintf(
1351                                         "Got packet %s (%s)",
1352                                         pktName, humanize.IBytes(uint64(len(file.Payload))),
1353                                 )
1354                         }
1355                         fullsize := int64(0)
1356                         state.RLock()
1357                         infoTheir := state.infosTheir[*file.Hash]
1358                         state.RUnlock()
1359                         if infoTheir == nil {
1360                                 state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1361                                         return logMsg(les) + ": unknown file"
1362                                 })
1363                                 continue
1364                         }
1365                         fullsize = int64(infoTheir.Size)
1366                         lesp = append(lesp, LE{"FullSize", fullsize})
1367                         dirToSync := filepath.Join(
1368                                 state.Ctx.Spool,
1369                                 state.Node.Id.String(),
1370                                 string(TRx),
1371                         )
1372                         filePath := filepath.Join(dirToSync, pktName)
1373                         filePathPart := filePath + PartSuffix
1374                         state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
1375                                 return logMsg(les) + ": opening part"
1376                         })
1377                         state.fdsLock.RLock()
1378                         fdAndFullSize, exists := state.fds[filePathPart]
1379                         state.fdsLock.RUnlock()
1380                         hasherAndOffset := state.fileHashers[filePath]
1381                         var fd *os.File
1382                         if exists {
1383                                 fd = fdAndFullSize.fd
1384                         } else {
1385                                 fd, err = os.OpenFile(
1386                                         filePathPart,
1387                                         os.O_RDWR|os.O_CREATE,
1388                                         os.FileMode(0666),
1389                                 )
1390                                 if err != nil {
1391                                         state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1392                                                 return logMsg(les) + ": opening part"
1393                                         })
1394                                         return nil, err
1395                                 }
1396                                 state.fdsLock.Lock()
1397                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1398                                 state.fdsLock.Unlock()
1399                                 if !state.NoCK {
1400                                         hasherAndOffset = &MTHAndOffset{
1401                                                 mth:    MTHNew(fullsize, int64(file.Offset)),
1402                                                 offset: file.Offset,
1403                                         }
1404                                         state.fileHashers[filePath] = hasherAndOffset
1405                                 }
1406                         }
1407                         state.Ctx.LogD(
1408                                 "sp-file-seek",
1409                                 append(lesp, LE{"Offset", file.Offset}),
1410                                 func(les LEs) string {
1411                                         return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset)
1412                                 })
1413                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1414                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1415                                         return logMsg(les) + ": seeking"
1416                                 })
1417                                 state.closeFd(filePathPart)
1418                                 return nil, err
1419                         }
1420                         state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string {
1421                                 return logMsg(les) + ": writing"
1422                         })
1423                         if _, err = fd.Write(file.Payload); err != nil {
1424                                 state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string {
1425                                         return logMsg(les) + ": writing"
1426                                 })
1427                                 state.closeFd(filePathPart)
1428                                 return nil, err
1429                         }
1430                         if hasherAndOffset != nil {
1431                                 if hasherAndOffset.offset == file.Offset {
1432                                         if _, err = hasherAndOffset.mth.Write(file.Payload); err != nil {
1433                                                 panic(err)
1434                                         }
1435                                         hasherAndOffset.offset += uint64(len(file.Payload))
1436                                 } else {
1437                                         state.Ctx.LogE(
1438                                                 "sp-file-offset-differs", lesp, errors.New("offset differs"),
1439                                                 func(les LEs) string {
1440                                                         return logMsg(les) + ": deleting hasher"
1441                                                 },
1442                                         )
1443                                         delete(state.fileHashers, filePath)
1444                                         hasherAndOffset = nil
1445                                 }
1446                         }
1447                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1448                         lesp[len(lesp)-2].V = ourSize
1449                         if state.Ctx.ShowPrgrs {
1450                                 state.progressBars[pktName] = struct{}{}
1451                                 Progress("Rx", lesp)
1452                         }
1453                         if fullsize != ourSize {
1454                                 continue
1455                         }
1456                         if state.Ctx.ShowPrgrs {
1457                                 delete(state.progressBars, pktName)
1458                         }
1459                         logMsg = func(les LEs) string {
1460                                 return fmt.Sprintf(
1461                                         "Got packet %s %d%% (%s / %s)",
1462                                         pktName, 100*ourSize/fullsize,
1463                                         humanize.IBytes(uint64(ourSize)),
1464                                         humanize.IBytes(uint64(fullsize)),
1465                                 )
1466                         }
1467                         if !NoSync {
1468                                 err = fd.Sync()
1469                                 if err != nil {
1470                                         state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
1471                                                 return logMsg(les) + ": syncing"
1472                                         })
1473                                         state.closeFd(filePathPart)
1474                                         continue
1475                                 }
1476                         }
1477                         if hasherAndOffset != nil {
1478                                 delete(state.fileHashers, filePath)
1479                                 if hasherAndOffset.mth.PreaddSize() == 0 {
1480                                         if bytes.Compare(hasherAndOffset.mth.Sum(nil), file.Hash[:]) != 0 {
1481                                                 state.Ctx.LogE(
1482                                                         "sp-file-bad-checksum", lesp,
1483                                                         errors.New("checksum mismatch"),
1484                                                         logMsg,
1485                                                 )
1486                                                 state.closeFd(filePathPart)
1487                                                 continue
1488                                         }
1489                                         if err = os.Rename(filePathPart, filePath); err != nil {
1490                                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1491                                                         return logMsg(les) + ": renaming"
1492                                                 })
1493                                                 state.closeFd(filePathPart)
1494                                                 continue
1495                                         }
1496                                         if err = DirSync(dirToSync); err != nil {
1497                                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1498                                                         return logMsg(les) + ": dirsyncing"
1499                                                 })
1500                                                 state.closeFd(filePathPart)
1501                                                 continue
1502                                         }
1503                                         state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
1504                                                 return logMsg(les) + ": done"
1505                                         })
1506                                         state.wg.Add(1)
1507                                         go func() {
1508                                                 state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1509                                                 state.wg.Done()
1510                                         }()
1511                                         state.Lock()
1512                                         delete(state.infosTheir, *file.Hash)
1513                                         state.Unlock()
1514                                         if !state.Ctx.HdrUsage {
1515                                                 continue
1516                                         }
1517                                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
1518                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1519                                                         return logMsg(les) + ": seeking"
1520                                                 })
1521                                                 state.closeFd(filePathPart)
1522                                                 continue
1523                                         }
1524                                         _, pktEncRaw, err := state.Ctx.HdrRead(fd)
1525                                         state.closeFd(filePathPart)
1526                                         if err != nil {
1527                                                 state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
1528                                                         return logMsg(les) + ": HdrReading"
1529                                                 })
1530                                                 continue
1531                                         }
1532                                         state.Ctx.HdrWrite(pktEncRaw, filePath)
1533                                         continue
1534                                 }
1535                         }
1536                         state.closeFd(filePathPart)
1537                         if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1538                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1539                                         return logMsg(les) + ": renaming"
1540                                 })
1541                                 continue
1542                         }
1543                         if err = DirSync(dirToSync); err != nil {
1544                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1545                                         return logMsg(les) + ": dirsyncing"
1546                                 })
1547                                 continue
1548                         }
1549                         state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string {
1550                                 return logMsg(les) + ": downloaded"
1551                         })
1552                         state.Lock()
1553                         delete(state.infosTheir, *file.Hash)
1554                         state.Unlock()
1555                         go func() {
1556                                 t := SPCheckerTask{
1557                                         nodeId: state.Node.Id,
1558                                         hsh:    file.Hash,
1559                                         done:   state.payloads,
1560                                 }
1561                                 if hasherAndOffset != nil {
1562                                         t.mth = hasherAndOffset.mth
1563                                 }
1564                                 spCheckerTasks <- t
1565                         }()
1566
1567                 case SPTypeDone:
1568                         lesp := append(les, LE{"Type", "done"})
1569                         state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string {
1570                                 return fmt.Sprintf(
1571                                         "SP with %s (nice %s): unmarshaling DONE",
1572                                         state.Node.Name, NicenessFmt(state.Nice),
1573                                 )
1574                         })
1575                         var done SPDone
1576                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1577                                 state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string {
1578                                         return fmt.Sprintf(
1579                                                 "SP with %s (nice %s): unmarshaling DONE",
1580                                                 state.Node.Name, NicenessFmt(state.Nice),
1581                                         )
1582                                 })
1583                                 return nil, err
1584                         }
1585                         pktName := Base32Codec.EncodeToString(done.Hash[:])
1586                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)})
1587                         logMsg := func(les LEs) string {
1588                                 return fmt.Sprintf(
1589                                         "SP with %s (nice %s): DONE: removing %s",
1590                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1591                                 )
1592                         }
1593                         state.Ctx.LogD("sp-done", lesp, logMsg)
1594                         pth := filepath.Join(
1595                                 state.Ctx.Spool,
1596                                 state.Node.Id.String(),
1597                                 string(TTx),
1598                                 pktName,
1599                         )
1600                         if err = os.Remove(pth); err == nil {
1601                                 state.Ctx.LogI("sp-done", lesp, func(les LEs) string {
1602                                         return fmt.Sprintf("Packet %s is sent", pktName)
1603                                 })
1604                                 if state.Ctx.HdrUsage {
1605                                         os.Remove(JobPath2Hdr(pth))
1606                                 }
1607                         } else {
1608                                 state.Ctx.LogE("sp-done", lesp, err, logMsg)
1609                         }
1610
1611                 case SPTypeFreq:
1612                         lesp := append(les, LE{"Type", "freq"})
1613                         state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string {
1614                                 return fmt.Sprintf(
1615                                         "SP with %s (nice %s): unmarshaling FREQ",
1616                                         state.Node.Name, NicenessFmt(state.Nice),
1617                                 )
1618                         })
1619                         var freq SPFreq
1620                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1621                                 state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string {
1622                                         return fmt.Sprintf(
1623                                                 "SP with %s (nice %s): unmarshaling FREQ",
1624                                                 state.Node.Name, NicenessFmt(state.Nice),
1625                                         )
1626                                 })
1627                                 return nil, err
1628                         }
1629                         pktName := Base32Codec.EncodeToString(freq.Hash[:])
1630                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset})
1631                         state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string {
1632                                 return fmt.Sprintf(
1633                                         "SP with %s (nice %s): FREQ %s: queuing",
1634                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1635                                 )
1636                         })
1637                         nice, exists := state.infosOurSeen[*freq.Hash]
1638                         if exists {
1639                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1640                                         state.Lock()
1641                                         insertIdx := 0
1642                                         var freqWithNice *FreqWithNice
1643                                         for insertIdx, freqWithNice = range state.queueTheir {
1644                                                 if freqWithNice.nice > nice {
1645                                                         break
1646                                                 }
1647                                         }
1648                                         state.queueTheir = append(state.queueTheir, nil)
1649                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1650                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1651                                         state.Unlock()
1652                                 } else {
1653                                         state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string {
1654                                                 return fmt.Sprintf(
1655                                                         "SP with %s (nice %s): FREQ %s: skipping",
1656                                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1657                                                 )
1658                                         })
1659                                 }
1660                         } else {
1661                                 state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string {
1662                                         return fmt.Sprintf(
1663                                                 "SP with %s (nice %s): FREQ %s: unknown",
1664                                                 state.Node.Name, NicenessFmt(state.Nice), pktName,
1665                                         )
1666                                 })
1667                         }
1668
1669                 default:
1670                         state.Ctx.LogE(
1671                                 "sp-process-type-unknown",
1672                                 append(les, LE{"Type", head.Type}),
1673                                 errors.New("unknown type"),
1674                                 func(les LEs) string {
1675                                         return fmt.Sprintf(
1676                                                 "SP with %s (nice %s): %d",
1677                                                 state.Node.Name, NicenessFmt(state.Nice), head.Type,
1678                                         )
1679                                 },
1680                         )
1681                         return nil, BadPktType
1682                 }
1683         }
1684
1685         if infosGot {
1686                 var pkts int
1687                 var size uint64
1688                 state.RLock()
1689                 for _, info := range state.infosTheir {
1690                         pkts++
1691                         size += info.Size
1692                 }
1693                 state.RUnlock()
1694                 state.Ctx.LogI("sp-infos-rx", LEs{
1695                         {"XX", string(TRx)},
1696                         {"Node", state.Node.Id},
1697                         {"Pkts", pkts},
1698                         {"Size", int64(size)},
1699                 }, func(les LEs) string {
1700                         return fmt.Sprintf(
1701                                 "%s has got for us: %d packets, %s",
1702                                 state.Node.Name, pkts, humanize.IBytes(size),
1703                         )
1704                 })
1705         }
1706         return payloadsSplit(replies), nil
1707 }
1708
1709 func SPChecker(ctx *Ctx) {
1710         for t := range spCheckerTasks {
1711                 pktName := Base32Codec.EncodeToString(t.hsh[:])
1712                 les := LEs{
1713                         {"XX", string(TRx)},
1714                         {"Node", t.nodeId},
1715                         {"Pkt", pktName},
1716                 }
1717                 SPCheckerWg.Add(1)
1718                 ctx.LogD("sp-checker", les, func(les LEs) string {
1719                         return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(t.nodeId), pktName)
1720                 })
1721                 size, err := ctx.CheckNoCK(t.nodeId, t.hsh, t.mth)
1722                 les = append(les, LE{"Size", size})
1723                 if err != nil {
1724                         ctx.LogE("sp-checker", les, err, func(les LEs) string {
1725                                 return fmt.Sprintf(
1726                                         "Checksumming %s/rx/%s (%s)", ctx.NodeName(t.nodeId), pktName,
1727                                         humanize.IBytes(uint64(size)),
1728                                 )
1729                         })
1730                         SPCheckerWg.Done()
1731                         continue
1732                 }
1733                 ctx.LogI("sp-checker-done", les, func(les LEs) string {
1734                         return fmt.Sprintf(
1735                                 "Packet %s is retreived (%s)",
1736                                 pktName, humanize.IBytes(uint64(size)),
1737                         )
1738                 })
1739                 SPCheckerWg.Done()
1740                 go func(t SPCheckerTask) {
1741                         defer func() { recover() }()
1742                         t.done <- MarshalSP(SPTypeDone, SPDone{t.hsh})
1743                 }(t)
1744         }
1745 }