]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
115b39ef61a2ceeba749cc8c9acfa5c722066489
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2023 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "fmt"
25         "io"
26         "log"
27         "os"
28         "path/filepath"
29         "sort"
30         "strconv"
31         "sync"
32         "time"
33
34         xdr "github.com/davecgh/go-xdr/xdr2"
35         "github.com/dustin/go-humanize"
36         "github.com/flynn/noise"
37 )
38
39 const (
40         MaxSPSize      = 1<<16 - 256
41         PartSuffix     = ".part"
42         SPHeadOverhead = 4
43         CfgDeadline    = "NNCPDEADLINE"
44 )
45
46 type MTHAndOffset struct {
47         mth    MTH
48         offset uint64
49 }
50
51 type SPCheckerTask struct {
52         nodeId *NodeId
53         hsh    *[MTHSize]byte
54         mth    MTH
55         done   chan []byte
56 }
57
58 var (
59         SPInfoOverhead    int
60         SPFreqOverhead    int
61         SPFileOverhead    int
62         SPHaltMarshalized []byte
63         SPPingMarshalized []byte
64
65         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
66                 noise.DH25519,
67                 noise.CipherChaChaPoly,
68                 noise.HashBLAKE2b,
69         )
70
71         DefaultDeadline = 10 * time.Second
72         PingTimeout     = time.Minute
73
74         spCheckerTasks chan SPCheckerTask
75         SPCheckerWg    sync.WaitGroup
76         spCheckerOnce  sync.Once
77 )
78
79 type FdAndFullSize struct {
80         fd       *os.File
81         fullSize int64
82 }
83
84 type SPType uint8
85
86 const (
87         SPTypeInfo SPType = iota
88         SPTypeFreq SPType = iota
89         SPTypeFile SPType = iota
90         SPTypeDone SPType = iota
91         SPTypeHalt SPType = iota
92         SPTypePing SPType = iota
93 )
94
95 type SPHead struct {
96         Type SPType
97 }
98
99 type SPInfo struct {
100         Nice uint8
101         Size uint64
102         Hash *[MTHSize]byte
103 }
104
105 type SPFreq struct {
106         Hash   *[MTHSize]byte
107         Offset uint64
108 }
109
110 type SPFile struct {
111         Hash    *[MTHSize]byte
112         Offset  uint64
113         Payload []byte
114 }
115
116 type SPDone struct {
117         Hash *[MTHSize]byte
118 }
119
120 type SPRaw struct {
121         Magic   [8]byte
122         Payload []byte
123 }
124
125 type FreqWithNice struct {
126         freq *SPFreq
127         nice uint8
128 }
129
130 type ConnDeadlined interface {
131         io.ReadWriteCloser
132         SetReadDeadline(t time.Time) error
133         SetWriteDeadline(t time.Time) error
134 }
135
136 func init() {
137         if v := os.Getenv(CfgDeadline); v != "" {
138                 i, err := strconv.Atoi(v)
139                 if err != nil {
140                         log.Fatalln("Can not convert", CfgDeadline, "to integer:", err)
141                 }
142                 DefaultDeadline = time.Duration(i) * time.Second
143         }
144
145         var buf bytes.Buffer
146         spHead := SPHead{Type: SPTypeHalt}
147         if _, err := xdr.Marshal(&buf, spHead); err != nil {
148                 panic(err)
149         }
150         SPHaltMarshalized = make([]byte, SPHeadOverhead)
151         copy(SPHaltMarshalized, buf.Bytes())
152         buf.Reset()
153
154         spHead = SPHead{Type: SPTypePing}
155         if _, err := xdr.Marshal(&buf, spHead); err != nil {
156                 panic(err)
157         }
158         SPPingMarshalized = make([]byte, SPHeadOverhead)
159         copy(SPPingMarshalized, buf.Bytes())
160         buf.Reset()
161
162         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([MTHSize]byte)}
163         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
164                 panic(err)
165         }
166         SPInfoOverhead = buf.Len()
167         buf.Reset()
168
169         spFreq := SPFreq{Hash: new([MTHSize]byte), Offset: 123}
170         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
171                 panic(err)
172         }
173         SPFreqOverhead = buf.Len()
174         buf.Reset()
175
176         spFile := SPFile{Hash: new([MTHSize]byte), Offset: 123}
177         if _, err := xdr.Marshal(&buf, spFile); err != nil {
178                 panic(err)
179         }
180         SPFileOverhead = buf.Len()
181         spCheckerTasks = make(chan SPCheckerTask)
182 }
183
184 func MarshalSP(typ SPType, sp interface{}) []byte {
185         var buf bytes.Buffer
186         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
187                 panic(err)
188         }
189         if _, err := xdr.Marshal(&buf, sp); err != nil {
190                 panic(err)
191         }
192         return buf.Bytes()
193 }
194
195 func payloadsSplit(payloads [][]byte) [][]byte {
196         var outbounds [][]byte
197         outbound := make([]byte, 0, MaxSPSize)
198         for i, payload := range payloads {
199                 outbound = append(outbound, payload...)
200                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
201                         outbounds = append(outbounds, outbound)
202                         outbound = make([]byte, 0, MaxSPSize)
203                 }
204         }
205         if len(outbound) > 0 {
206                 outbounds = append(outbounds, outbound)
207         }
208         return outbounds
209 }
210
211 type SPState struct {
212         Ctx            *Ctx
213         Node           *Node
214         Nice           uint8
215         NoCK           bool
216         onlineDeadline time.Duration
217         maxOnlineTime  time.Duration
218         hs             *noise.HandshakeState
219         csOur          *noise.CipherState
220         csTheir        *noise.CipherState
221         payloads       chan []byte
222         pings          chan struct{}
223         infosTheir     map[[MTHSize]byte]*SPInfo
224         infosOurSeen   map[[MTHSize]byte]uint8
225         queueTheir     []*FreqWithNice
226         wg             sync.WaitGroup
227         RxBytes        int64
228         RxLastSeen     time.Time
229         RxLastNonPing  time.Time
230         TxBytes        int64
231         TxLastSeen     time.Time
232         TxLastNonPing  time.Time
233         started        time.Time
234         mustFinishAt   time.Time
235         Duration       time.Duration
236         RxSpeed        int64
237         TxSpeed        int64
238         rxLock         *os.File
239         txLock         *os.File
240         xxOnly         TRxTx
241         rxRate         int
242         txRate         int
243         isDead         chan struct{}
244         listOnly       bool
245         onlyPkts       map[[MTHSize]byte]bool
246         writeSPBuf     bytes.Buffer
247         fds            map[string]FdAndFullSize
248         fdsLock        sync.RWMutex
249         fileHashers    map[string]*MTHAndOffset
250         progressBars   map[string]struct{}
251         sync.RWMutex
252 }
253
254 func (state *SPState) SetDead() {
255         state.Lock()
256         defer state.Unlock()
257         select {
258         case <-state.isDead:
259                 // Already closed channel, dead
260                 return
261         default:
262         }
263         close(state.isDead)
264         go func() {
265                 for range state.payloads {
266                 }
267         }()
268         go func() {
269                 for range state.pings {
270                 }
271         }()
272 }
273
274 func (state *SPState) NotAlive() bool {
275         select {
276         case <-state.isDead:
277                 return true
278         default:
279         }
280         return false
281 }
282
283 func (state *SPState) dirUnlock() {
284         state.Ctx.UnlockDir(state.rxLock)
285         state.Ctx.UnlockDir(state.txLock)
286 }
287
288 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
289         state.writeSPBuf.Reset()
290         if _, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
291                 Magic:   MagicNNCPSv1.B,
292                 Payload: payload,
293         }); err != nil {
294                 return err
295         }
296         n, err := dst.Write(state.writeSPBuf.Bytes())
297         if err != nil {
298                 return err
299         }
300         state.TxLastSeen = time.Now()
301         state.TxBytes += int64(n)
302         if !ping {
303                 state.TxLastNonPing = state.TxLastSeen
304         }
305         return nil
306 }
307
308 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
309         var sp SPRaw
310         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
311         if err != nil {
312                 ue := err.(*xdr.UnmarshalError)
313                 if ue.Err == io.EOF {
314                         return nil, ue.Err
315                 }
316                 return nil, err
317         }
318         state.RxLastSeen = time.Now()
319         state.RxBytes += int64(n)
320         if sp.Magic != MagicNNCPSv1.B {
321                 return nil, BadMagic
322         }
323         return sp.Payload, nil
324 }
325
326 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[MTHSize]byte]uint8) [][]byte {
327         var infos []*SPInfo
328         var totalSize int64
329         for job := range ctx.Jobs(nodeId, TTx) {
330                 if job.PktEnc.Nice > nice {
331                         continue
332                 }
333                 if _, known := (*seen)[*job.HshValue]; known {
334                         continue
335                 }
336                 totalSize += job.Size
337                 infos = append(infos, &SPInfo{
338                         Nice: job.PktEnc.Nice,
339                         Size: uint64(job.Size),
340                         Hash: job.HshValue,
341                 })
342                 (*seen)[*job.HshValue] = job.PktEnc.Nice
343         }
344         sort.Sort(ByNice(infos))
345         var payloads [][]byte
346         for _, info := range infos {
347                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
348                 pktName := Base32Codec.EncodeToString(info.Hash[:])
349                 ctx.LogD("sp-info-our", LEs{
350                         {"Node", nodeId},
351                         {"Name", pktName},
352                         {"Size", info.Size},
353                 }, func(les LEs) string {
354                         return fmt.Sprintf(
355                                 "Our info: %s/tx/%s (%s)",
356                                 ctx.NodeName(nodeId),
357                                 pktName,
358                                 humanize.IBytes(info.Size),
359                         )
360                 })
361         }
362         if totalSize > 0 {
363                 ctx.LogI("sp-infos-tx", LEs{
364                         {"XX", string(TTx)},
365                         {"Node", nodeId},
366                         {"Pkts", len(payloads)},
367                         {"Size", totalSize},
368                 }, func(les LEs) string {
369                         return fmt.Sprintf(
370                                 "We have got for %s: %d packets, %s",
371                                 ctx.NodeName(nodeId),
372                                 len(payloads),
373                                 humanize.IBytes(uint64(totalSize)),
374                         )
375                 })
376         }
377         return payloadsSplit(payloads)
378 }
379
380 func (state *SPState) StartI(conn ConnDeadlined) error {
381         nodeId := state.Node.Id
382         err := state.Ctx.ensureRxDir(nodeId)
383         if err != nil {
384                 return err
385         }
386         var rxLock *os.File
387         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
388                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
389                 if err != nil {
390                         return err
391                 }
392         }
393         var txLock *os.File
394         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
395                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
396                 if err != nil {
397                         return err
398                 }
399         }
400         started := time.Now()
401         conf := noise.Config{
402                 CipherSuite: NoiseCipherSuite,
403                 Pattern:     noise.HandshakeIK,
404                 Initiator:   true,
405                 StaticKeypair: noise.DHKey{
406                         Private: state.Ctx.Self.NoisePrv[:],
407                         Public:  state.Ctx.Self.NoisePub[:],
408                 },
409                 PeerStatic: state.Node.NoisePub[:],
410         }
411         hs, err := noise.NewHandshakeState(conf)
412         if err != nil {
413                 return err
414         }
415         state.hs = hs
416         state.payloads = make(chan []byte)
417         state.pings = make(chan struct{})
418         state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
419         state.infosOurSeen = make(map[[MTHSize]byte]uint8)
420         state.progressBars = make(map[string]struct{})
421         state.started = started
422         state.rxLock = rxLock
423         state.txLock = txLock
424
425         var infosPayloads [][]byte
426         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
427                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
428         }
429         var firstPayload []byte
430         if len(infosPayloads) > 0 {
431                 firstPayload = infosPayloads[0]
432         }
433         // Pad first payload, to hide actual number of existing files
434         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
435                 firstPayload = append(firstPayload, SPHaltMarshalized...)
436         }
437
438         var buf []byte
439         var payload []byte
440         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
441         if err != nil {
442                 state.dirUnlock()
443                 return err
444         }
445         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
446         state.Ctx.LogD("sp-startI", les, func(les LEs) string {
447                 return fmt.Sprintf(
448                         "SP with %s (nice %s): sending first message",
449                         state.Node.Name,
450                         NicenessFmt(state.Nice),
451                 )
452         })
453         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
454         if err = state.WriteSP(conn, buf, false); err != nil {
455                 state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
456                         return fmt.Sprintf(
457                                 "SP with %s (nice %s): writing",
458                                 state.Node.Name,
459                                 NicenessFmt(state.Nice),
460                         )
461                 })
462                 state.dirUnlock()
463                 return err
464         }
465         state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string {
466                 return fmt.Sprintf(
467                         "SP with %s (nice %s): waiting for first message",
468                         state.Node.Name,
469                         NicenessFmt(state.Nice),
470                 )
471         })
472         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
473         if buf, err = state.ReadSP(conn); err != nil {
474                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
475                         return fmt.Sprintf(
476                                 "SP with %s (nice %s): reading",
477                                 state.Node.Name,
478                                 NicenessFmt(state.Nice),
479                         )
480                 })
481                 state.dirUnlock()
482                 return err
483         }
484         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
485         if err != nil {
486                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
487                         return fmt.Sprintf(
488                                 "SP with %s (nice %s): reading Noise message",
489                                 state.Node.Name,
490                                 NicenessFmt(state.Nice),
491                         )
492                 })
493                 state.dirUnlock()
494                 return err
495         }
496         state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string {
497                 return fmt.Sprintf(
498                         "SP with %s (nice %s): starting workers",
499                         state.Node.Name,
500                         NicenessFmt(state.Nice),
501                 )
502         })
503         err = state.StartWorkers(conn, infosPayloads, payload)
504         if err != nil {
505                 state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string {
506                         return fmt.Sprintf(
507                                 "SP with %s (nice %s): starting workers",
508                                 state.Node.Name,
509                                 NicenessFmt(state.Nice),
510                         )
511                 })
512                 state.dirUnlock()
513         }
514         return err
515 }
516
517 func (state *SPState) StartR(conn ConnDeadlined) error {
518         started := time.Now()
519         conf := noise.Config{
520                 CipherSuite: NoiseCipherSuite,
521                 Pattern:     noise.HandshakeIK,
522                 Initiator:   false,
523                 StaticKeypair: noise.DHKey{
524                         Private: state.Ctx.Self.NoisePrv[:],
525                         Public:  state.Ctx.Self.NoisePub[:],
526                 },
527         }
528         hs, err := noise.NewHandshakeState(conf)
529         if err != nil {
530                 return err
531         }
532         xxOnly := TRxTx("")
533         state.hs = hs
534         state.payloads = make(chan []byte)
535         state.pings = make(chan struct{})
536         state.infosOurSeen = make(map[[MTHSize]byte]uint8)
537         state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
538         state.progressBars = make(map[string]struct{})
539         state.started = started
540         state.xxOnly = xxOnly
541
542         var buf []byte
543         var payload []byte
544         logMsg := func(les LEs) string {
545                 return fmt.Sprintf(
546                         "SP nice %s: waiting for first message",
547                         NicenessFmt(state.Nice),
548                 )
549         }
550         les := LEs{{"Nice", int(state.Nice)}}
551         state.Ctx.LogD("sp-startR", les, logMsg)
552         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
553         if buf, err = state.ReadSP(conn); err != nil {
554                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
555                 return err
556         }
557         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
558                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
559                 return err
560         }
561
562         var node *Node
563         for _, n := range state.Ctx.Neigh {
564                 if n.NoisePub == nil {
565                         continue
566                 }
567                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
568                         node = n
569                         break
570                 }
571         }
572         if node == nil {
573                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
574                 err = errors.New("unknown peer: " + peerId)
575                 state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg)
576                 return err
577         }
578         state.Node = node
579         state.rxRate = node.RxRate
580         state.txRate = node.TxRate
581         state.onlineDeadline = node.OnlineDeadline
582         state.maxOnlineTime = node.MaxOnlineTime
583         les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
584
585         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
586                 return err
587         }
588         var rxLock *os.File
589         if xxOnly == "" || xxOnly == TRx {
590                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
591                 if err != nil {
592                         return err
593                 }
594         }
595         state.rxLock = rxLock
596         var txLock *os.File
597         if xxOnly == "" || xxOnly == TTx {
598                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
599                 if err != nil {
600                         return err
601                 }
602         }
603         state.txLock = txLock
604
605         var infosPayloads [][]byte
606         if xxOnly == "" || xxOnly == TTx {
607                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
608         }
609         var firstPayload []byte
610         if len(infosPayloads) > 0 {
611                 firstPayload = infosPayloads[0]
612         }
613         // Pad first payload, to hide actual number of existing files
614         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
615                 firstPayload = append(firstPayload, SPHaltMarshalized...)
616         }
617
618         state.Ctx.LogD("sp-startR-write", les, func(les LEs) string {
619                 return fmt.Sprintf(
620                         "SP with %s (nice %s): sending first message",
621                         node.Name, NicenessFmt(state.Nice),
622                 )
623         })
624         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
625         if err != nil {
626                 state.dirUnlock()
627                 return err
628         }
629         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
630         if err = state.WriteSP(conn, buf, false); err != nil {
631                 state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
632                         return fmt.Sprintf(
633                                 "SP with %s (nice %s): writing",
634                                 node.Name, NicenessFmt(state.Nice),
635                         )
636                 })
637                 state.dirUnlock()
638                 return err
639         }
640         state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string {
641                 return fmt.Sprintf(
642                         "SP with %s (nice %s): starting workers",
643                         node.Name, NicenessFmt(state.Nice),
644                 )
645         })
646         err = state.StartWorkers(conn, infosPayloads, payload)
647         if err != nil {
648                 state.dirUnlock()
649         }
650         return err
651 }
652
653 func (state *SPState) closeFd(pth string) {
654         state.fdsLock.Lock()
655         if s, exists := state.fds[pth]; exists {
656                 delete(state.fds, pth)
657                 s.fd.Close()
658         }
659         state.fdsLock.Unlock()
660 }
661
662 func (state *SPState) StartWorkers(
663         conn ConnDeadlined,
664         infosPayloads [][]byte,
665         payload []byte,
666 ) error {
667         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
668         state.fds = make(map[string]FdAndFullSize)
669         state.fileHashers = make(map[string]*MTHAndOffset)
670         state.isDead = make(chan struct{})
671         if state.maxOnlineTime > 0 {
672                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
673         }
674         if !state.NoCK {
675                 spCheckerOnce.Do(func() { go SPChecker(state.Ctx) })
676                 go func() {
677                         for job := range state.Ctx.JobsNoCK(state.Node.Id) {
678                                 if job.PktEnc.Nice <= state.Nice {
679                                         spCheckerTasks <- SPCheckerTask{
680                                                 nodeId: state.Node.Id,
681                                                 hsh:    job.HshValue,
682                                                 done:   state.payloads,
683                                         }
684                                 }
685                         }
686                 }()
687         }
688
689         // Remaining handshake payload sending
690         if len(infosPayloads) > 1 {
691                 state.wg.Add(1)
692                 go func() {
693                         for _, payload := range infosPayloads[1:] {
694                                 state.Ctx.LogD(
695                                         "sp-queue-remaining",
696                                         append(les, LE{"Size", int64(len(payload))}),
697                                         func(les LEs) string {
698                                                 return fmt.Sprintf(
699                                                         "SP with %s (nice %s): queuing remaining payload (%s)",
700                                                         state.Node.Name, NicenessFmt(state.Nice),
701                                                         humanize.IBytes(uint64(len(payload))),
702                                                 )
703                                         },
704                                 )
705                                 state.payloads <- payload
706                         }
707                         state.wg.Done()
708                 }()
709         }
710
711         // Processing of first payload and queueing its responses
712         logMsg := func(les LEs) string {
713                 return fmt.Sprintf(
714                         "SP with %s (nice %s): processing first payload (%s)",
715                         state.Node.Name, NicenessFmt(state.Nice),
716                         humanize.IBytes(uint64(len(payload))),
717                 )
718         }
719         state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
720         replies, err := state.ProcessSP(payload)
721         if err != nil {
722                 state.Ctx.LogE("sp-process", les, err, logMsg)
723                 return err
724         }
725         state.wg.Add(1)
726         go func() {
727                 for _, reply := range replies {
728                         state.Ctx.LogD(
729                                 "sp-queue-reply",
730                                 append(les, LE{"Size", int64(len(reply))}),
731                                 func(les LEs) string {
732                                         return fmt.Sprintf(
733                                                 "SP with %s (nice %s): queuing reply (%s)",
734                                                 state.Node.Name, NicenessFmt(state.Nice),
735                                                 humanize.IBytes(uint64(len(payload))),
736                                         )
737                                 },
738                         )
739                         state.payloads <- reply
740                 }
741                 state.wg.Done()
742         }()
743
744         // Periodic jobs
745         state.wg.Add(1)
746         go func() {
747                 deadlineTicker := time.NewTicker(time.Second)
748                 pingTicker := time.NewTicker(PingTimeout)
749                 for {
750                         select {
751                         case <-state.isDead:
752                                 state.wg.Done()
753                                 deadlineTicker.Stop()
754                                 pingTicker.Stop()
755                                 return
756                         case now := <-deadlineTicker.C:
757                                 if now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
758                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline {
759                                         goto Deadlined
760                                 }
761                                 if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) {
762                                         goto Deadlined
763                                 }
764                                 if now.Sub(state.RxLastSeen) >= 2*PingTimeout {
765                                         goto Deadlined
766                                 }
767                                 break
768                         Deadlined:
769                                 state.SetDead()
770                                 conn.Close()
771                         case now := <-pingTicker.C:
772                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
773                                         state.wg.Add(1)
774                                         go func() {
775                                                 state.pings <- struct{}{}
776                                                 state.wg.Done()
777                                         }()
778                                 }
779                         }
780                 }
781         }()
782
783         // Spool checker and INFOs sender of appearing files
784         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
785                 state.wg.Add(1)
786                 go func() {
787                         dw, err := state.Ctx.NewDirWatcher(
788                                 filepath.Join(state.Ctx.Spool, state.Node.Id.String(), string(TTx)),
789                                 time.Second,
790                         )
791                         if err != nil {
792                                 state.Ctx.LogE("sp-queue-dir-watch", les, err, logMsg)
793                                 log.Fatalln(err)
794                         }
795                         for {
796                                 select {
797                                 case <-state.isDead:
798                                         dw.Close()
799                                         state.wg.Done()
800                                         return
801                                 case <-dw.C:
802                                         for _, payload := range state.Ctx.infosOur(
803                                                 state.Node.Id,
804                                                 state.Nice,
805                                                 &state.infosOurSeen,
806                                         ) {
807                                                 state.Ctx.LogD(
808                                                         "sp-queue-info",
809                                                         append(les, LE{"Size", int64(len(payload))}),
810                                                         func(les LEs) string {
811                                                                 return fmt.Sprintf(
812                                                                         "SP with %s (nice %s): queuing new info (%s)",
813                                                                         state.Node.Name, NicenessFmt(state.Nice),
814                                                                         humanize.IBytes(uint64(len(payload))),
815                                                                 )
816                                                         },
817                                                 )
818                                                 state.payloads <- payload
819                                         }
820                                 }
821                         }
822                 }()
823         }
824
825         // Sender
826         state.wg.Add(1)
827         go func() {
828                 defer conn.Close()
829                 defer state.SetDead()
830                 defer state.wg.Done()
831                 buf := make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
832                 for {
833                         if state.NotAlive() {
834                                 return
835                         }
836                         var payload []byte
837                         var ping bool
838                         select {
839                         case <-state.pings:
840                                 state.Ctx.LogD("sp-got-ping", les, func(les LEs) string {
841                                         return fmt.Sprintf(
842                                                 "SP with %s (nice %s): got ping",
843                                                 state.Node.Name, NicenessFmt(state.Nice),
844                                         )
845                                 })
846                                 payload = SPPingMarshalized
847                                 ping = true
848                         case payload = <-state.payloads:
849                                 state.Ctx.LogD(
850                                         "sp-got-payload",
851                                         append(les, LE{"Size", int64(len(payload))}),
852                                         func(les LEs) string {
853                                                 return fmt.Sprintf(
854                                                         "SP with %s (nice %s): got payload (%s)",
855                                                         state.Node.Name, NicenessFmt(state.Nice),
856                                                         humanize.IBytes(uint64(len(payload))),
857                                                 )
858                                         },
859                                 )
860                         default:
861                                 state.RLock()
862                                 if len(state.queueTheir) == 0 {
863                                         state.RUnlock()
864                                         time.Sleep(100 * time.Millisecond)
865                                         continue
866                                 }
867                                 freq := state.queueTheir[0].freq
868                                 state.RUnlock()
869                                 if state.txRate > 0 {
870                                         time.Sleep(time.Second / time.Duration(state.txRate))
871                                 }
872                                 pktName := Base32Codec.EncodeToString(freq.Hash[:])
873                                 lesp := append(
874                                         les,
875                                         LE{"XX", string(TTx)},
876                                         LE{"Pkt", pktName},
877                                         LE{"Size", int64(freq.Offset)},
878                                 )
879                                 logMsg := func(les LEs) string {
880                                         return fmt.Sprintf(
881                                                 "SP with %s (nice %s): tx/%s (%s)",
882                                                 state.Node.Name, NicenessFmt(state.Nice),
883                                                 pktName,
884                                                 humanize.IBytes(freq.Offset),
885                                         )
886                                 }
887                                 state.Ctx.LogD("sp-queue", lesp, func(les LEs) string {
888                                         return logMsg(les) + ": queueing"
889                                 })
890                                 pth := filepath.Join(
891                                         state.Ctx.Spool,
892                                         state.Node.Id.String(),
893                                         string(TTx),
894                                         Base32Codec.EncodeToString(freq.Hash[:]),
895                                 )
896                                 state.fdsLock.RLock()
897                                 fdAndFullSize, exists := state.fds[pth]
898                                 state.fdsLock.RUnlock()
899                                 if !exists {
900                                         state.Ctx.LogD("sp-queue-open", lesp, func(les LEs) string {
901                                                 return logMsg(les) + ": opening"
902                                         })
903                                         fd, err := os.Open(pth)
904                                         if err != nil {
905                                                 state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
906                                                         return logMsg(les) + ": opening"
907                                                 })
908                                                 return
909                                         }
910                                         fi, err := fd.Stat()
911                                         if err != nil {
912                                                 state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string {
913                                                         return logMsg(les) + ": stating"
914                                                 })
915                                                 return
916                                         }
917                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
918                                         state.fdsLock.Lock()
919                                         state.fds[pth] = fdAndFullSize
920                                         state.fdsLock.Unlock()
921                                 }
922                                 fd := fdAndFullSize.fd
923                                 fullSize := fdAndFullSize.fullSize
924                                 lesp = append(lesp, LE{"FullSize", fullSize})
925                                 var bufRead []byte
926                                 if freq.Offset < uint64(fullSize) {
927                                         state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
928                                                 return logMsg(les) + ": seeking"
929                                         })
930                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
931                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
932                                                         return logMsg(les) + ": seeking"
933                                                 })
934                                                 return
935                                         }
936                                         n, err := fd.Read(buf)
937                                         if err != nil {
938                                                 state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
939                                                         return logMsg(les) + ": reading"
940                                                 })
941                                                 return
942                                         }
943                                         bufRead = buf[:n]
944                                         lesp = append(
945                                                 les,
946                                                 LE{"XX", string(TTx)},
947                                                 LE{"Pkt", pktName},
948                                                 LE{"Size", int64(n)},
949                                                 LE{"FullSize", fullSize},
950                                         )
951                                         state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
952                                                 return fmt.Sprintf(
953                                                         "%s: read %s",
954                                                         logMsg(les), humanize.IBytes(uint64(n)),
955                                                 )
956                                         })
957                                 } else {
958                                         state.closeFd(pth)
959                                 }
960                                 payload = MarshalSP(SPTypeFile, SPFile{
961                                         Hash:    freq.Hash,
962                                         Offset:  freq.Offset,
963                                         Payload: bufRead,
964                                 })
965                                 ourSize := freq.Offset + uint64(len(bufRead))
966                                 lesp = append(
967                                         les,
968                                         LE{"XX", string(TTx)},
969                                         LE{"Pkt", pktName},
970                                         LE{"Size", int64(ourSize)},
971                                         LE{"FullSize", fullSize},
972                                 )
973                                 if state.Ctx.ShowPrgrs {
974                                         state.progressBars[pktName] = struct{}{}
975                                         Progress("Tx", lesp)
976                                 }
977                                 if ourSize == uint64(fullSize) {
978                                         state.closeFd(pth)
979                                         state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
980                                                 return logMsg(les) + ": finished"
981                                         })
982                                         if state.Ctx.ShowPrgrs {
983                                                 delete(state.progressBars, pktName)
984                                         }
985                                 }
986                                 state.Lock()
987                                 for i, q := range state.queueTheir {
988                                         if *q.freq.Hash != *freq.Hash {
989                                                 continue
990                                         }
991                                         if ourSize == uint64(fullSize) {
992                                                 state.queueTheir = append(
993                                                         state.queueTheir[:i],
994                                                         state.queueTheir[i+1:]...,
995                                                 )
996                                         } else {
997                                                 q.freq.Offset = ourSize
998                                         }
999                                         break
1000                                 }
1001                                 state.Unlock()
1002                         }
1003                         logMsg := func(les LEs) string {
1004                                 return fmt.Sprintf(
1005                                         "SP with %s (nice %s): sending %s",
1006                                         state.Node.Name, NicenessFmt(state.Nice),
1007                                         humanize.IBytes(uint64(len(payload))),
1008                                 )
1009                         }
1010                         state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
1011                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
1012                         ct, err := state.csOur.Encrypt(nil, nil, payload)
1013                         if err != nil {
1014                                 state.Ctx.LogE("sp-encrypting", les, err, logMsg)
1015                                 return
1016                         }
1017                         if err := state.WriteSP(conn, ct, ping); err != nil {
1018                                 state.Ctx.LogE("sp-sending", les, err, logMsg)
1019                                 return
1020                         }
1021                 }
1022         }()
1023
1024         // Receiver
1025         state.wg.Add(1)
1026         go func() {
1027                 for {
1028                         if state.NotAlive() {
1029                                 break
1030                         }
1031                         logMsg := func(les LEs) string {
1032                                 return fmt.Sprintf(
1033                                         "SP with %s (nice %s): waiting for payload",
1034                                         state.Node.Name, NicenessFmt(state.Nice),
1035                                 )
1036                         }
1037                         state.Ctx.LogD("sp-recv-wait", les, logMsg)
1038                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
1039                         payload, err := state.ReadSP(conn)
1040                         if err != nil {
1041                                 if err == io.EOF {
1042                                         break
1043                                 }
1044                                 unmarshalErr := err.(*xdr.UnmarshalError)
1045                                 if os.IsTimeout(unmarshalErr.Err) {
1046                                         continue
1047                                 }
1048                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
1049                                         break
1050                                 }
1051                                 state.Ctx.LogE("sp-recv-wait", les, err, logMsg)
1052                                 break
1053                         }
1054                         logMsg = func(les LEs) string {
1055                                 return fmt.Sprintf(
1056                                         "SP with %s (nice %s): payload (%s)",
1057                                         state.Node.Name, NicenessFmt(state.Nice),
1058                                         humanize.IBytes(uint64(len(payload))),
1059                                 )
1060                         }
1061                         state.Ctx.LogD(
1062                                 "sp-recv-got",
1063                                 append(les, LE{"Size", int64(len(payload))}),
1064                                 func(les LEs) string { return logMsg(les) + ": got" },
1065                         )
1066                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
1067                         if err != nil {
1068                                 state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string {
1069                                         return logMsg(les) + ": got"
1070                                 })
1071                                 break
1072                         }
1073                         state.Ctx.LogD(
1074                                 "sp-recv-process",
1075                                 append(les, LE{"Size", int64(len(payload))}),
1076                                 func(les LEs) string {
1077                                         return logMsg(les) + ": processing"
1078                                 },
1079                         )
1080                         replies, err := state.ProcessSP(payload)
1081                         if err != nil {
1082                                 state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string {
1083                                         return logMsg(les) + ": processing"
1084                                 })
1085                                 break
1086                         }
1087                         state.wg.Add(1)
1088                         go func() {
1089                                 for _, reply := range replies {
1090                                         state.Ctx.LogD(
1091                                                 "sp-recv-reply",
1092                                                 append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
1093                                                 func(les LEs) string {
1094                                                         return fmt.Sprintf(
1095                                                                 "SP with %s (nice %s): queuing reply (%s)",
1096                                                                 state.Node.Name, NicenessFmt(state.Nice),
1097                                                                 humanize.IBytes(uint64(len(reply))),
1098                                                         )
1099                                                 },
1100                                         )
1101                                         state.payloads <- reply
1102                                 }
1103                                 state.wg.Done()
1104                         }()
1105                         if state.rxRate > 0 {
1106                                 time.Sleep(time.Second / time.Duration(state.rxRate))
1107                         }
1108                 }
1109                 state.SetDead()
1110                 state.wg.Done()
1111                 state.SetDead()
1112                 conn.Close()
1113         }()
1114
1115         return nil
1116 }
1117
1118 func (state *SPState) Wait() bool {
1119         state.wg.Wait()
1120         close(state.payloads)
1121         close(state.pings)
1122         state.Duration = time.Now().Sub(state.started)
1123         state.dirUnlock()
1124         state.RxSpeed = state.RxBytes
1125         state.TxSpeed = state.TxBytes
1126         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
1127         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
1128         if rxDuration > 0 {
1129                 state.RxSpeed = state.RxBytes / rxDuration
1130         }
1131         if txDuration > 0 {
1132                 state.TxSpeed = state.TxBytes / txDuration
1133         }
1134         nothingLeft := len(state.queueTheir) == 0
1135         for _, s := range state.fds {
1136                 nothingLeft = false
1137                 s.fd.Close()
1138         }
1139         for pktName := range state.progressBars {
1140                 ProgressKill(pktName)
1141         }
1142         return nothingLeft
1143 }
1144
1145 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
1146         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
1147         r := bytes.NewReader(payload)
1148         var err error
1149         var replies [][]byte
1150         var infosGot bool
1151         for r.Len() > 0 {
1152                 state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string {
1153                         return fmt.Sprintf(
1154                                 "SP with %s (nice %s): unmarshaling header",
1155                                 state.Node.Name, NicenessFmt(state.Nice),
1156                         )
1157                 })
1158                 var head SPHead
1159                 if _, err = xdr.Unmarshal(r, &head); err != nil {
1160                         state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string {
1161                                 return fmt.Sprintf(
1162                                         "SP with %s (nice %s): unmarshaling header",
1163                                         state.Node.Name, NicenessFmt(state.Nice),
1164                                 )
1165                         })
1166                         return nil, err
1167                 }
1168                 if head.Type != SPTypePing {
1169                         state.RxLastNonPing = state.RxLastSeen
1170                 }
1171                 switch head.Type {
1172                 case SPTypeHalt:
1173                         state.Ctx.LogD(
1174                                 "sp-process-halt",
1175                                 append(les, LE{"Type", "halt"}), func(les LEs) string {
1176                                         return fmt.Sprintf(
1177                                                 "SP with %s (nice %s): got HALT",
1178                                                 state.Node.Name, NicenessFmt(state.Nice),
1179                                         )
1180                                 },
1181                         )
1182                         state.Lock()
1183                         state.queueTheir = nil
1184                         state.Unlock()
1185
1186                 case SPTypePing:
1187                         state.Ctx.LogD(
1188                                 "sp-process-ping",
1189                                 append(les, LE{"Type", "ping"}),
1190                                 func(les LEs) string {
1191                                         return fmt.Sprintf(
1192                                                 "SP with %s (nice %s): got PING",
1193                                                 state.Node.Name, NicenessFmt(state.Nice),
1194                                         )
1195                                 },
1196                         )
1197
1198                 case SPTypeInfo:
1199                         infosGot = true
1200                         lesp := append(les, LE{"Type", "info"})
1201                         state.Ctx.LogD(
1202                                 "sp-process-info-unmarshal", lesp,
1203                                 func(les LEs) string {
1204                                         return fmt.Sprintf(
1205                                                 "SP with %s (nice %s): unmarshaling INFO",
1206                                                 state.Node.Name, NicenessFmt(state.Nice),
1207                                         )
1208                                 },
1209                         )
1210                         var info SPInfo
1211                         if _, err = xdr.Unmarshal(r, &info); err != nil {
1212                                 state.Ctx.LogE(
1213                                         "sp-process-info-unmarshal", lesp, err,
1214                                         func(les LEs) string {
1215                                                 return fmt.Sprintf(
1216                                                         "SP with %s (nice %s): unmarshaling INFO",
1217                                                         state.Node.Name, NicenessFmt(state.Nice),
1218                                                 )
1219                                         },
1220                                 )
1221                                 return nil, err
1222                         }
1223                         pktName := Base32Codec.EncodeToString(info.Hash[:])
1224                         lesp = append(
1225                                 lesp,
1226                                 LE{"Pkt", pktName},
1227                                 LE{"Size", int64(info.Size)},
1228                                 LE{"PktNice", int(info.Nice)},
1229                         )
1230                         logMsg := func(les LEs) string {
1231                                 return fmt.Sprintf(
1232                                         "SP with %s (nice %s): INFO %s (%s) nice %s",
1233                                         state.Node.Name, NicenessFmt(state.Nice),
1234                                         pktName,
1235                                         humanize.IBytes(info.Size),
1236                                         NicenessFmt(info.Nice),
1237                                 )
1238                         }
1239                         if !state.listOnly && info.Nice > state.Nice {
1240                                 state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string {
1241                                         return logMsg(les) + ": too nice"
1242                                 })
1243                                 continue
1244                         }
1245                         state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string {
1246                                 return logMsg(les) + ": received"
1247                         })
1248                         if !state.listOnly && state.xxOnly == TTx {
1249                                 continue
1250                         }
1251                         state.Lock()
1252                         state.infosTheir[*info.Hash] = &info
1253                         state.Unlock()
1254                         state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string {
1255                                 return logMsg(les) + ": stating part"
1256                         })
1257                         pktPath := filepath.Join(
1258                                 state.Ctx.Spool,
1259                                 state.Node.Id.String(),
1260                                 string(TRx),
1261                                 Base32Codec.EncodeToString(info.Hash[:]),
1262                         )
1263                         logMsg = func(les LEs) string {
1264                                 return fmt.Sprintf(
1265                                         "Packet %s (%s) (nice %s)",
1266                                         pktName,
1267                                         humanize.IBytes(info.Size),
1268                                         NicenessFmt(info.Nice),
1269                                 )
1270                         }
1271                         if _, err = os.Stat(pktPath); err == nil {
1272                                 state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string {
1273                                         return logMsg(les) + ": already done"
1274                                 })
1275                                 if !state.listOnly {
1276                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1277                                 }
1278                                 continue
1279                         }
1280                         if _, err = os.Stat(filepath.Join(
1281                                 state.Ctx.Spool, state.Node.Id.String(), string(TRx),
1282                                 SeenDir, Base32Codec.EncodeToString(info.Hash[:]),
1283                         )); err == nil {
1284                                 state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
1285                                         return logMsg(les) + ": already seen"
1286                                 })
1287                                 if !state.listOnly {
1288                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1289                                 }
1290                                 continue
1291                         }
1292                         if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
1293                                 state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string {
1294                                         return logMsg(les) + ": still not checksummed"
1295                                 })
1296                                 continue
1297                         }
1298                         fi, err := os.Stat(pktPath + PartSuffix)
1299                         var offset int64
1300                         if err == nil {
1301                                 offset = fi.Size()
1302                         }
1303                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
1304                                 state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string {
1305                                         return logMsg(les) + ": not enough space"
1306                                 })
1307                                 continue
1308                         }
1309                         state.Ctx.LogI(
1310                                 "sp-info",
1311                                 append(lesp, LE{"Offset", offset}),
1312                                 func(les LEs) string {
1313                                         return fmt.Sprintf(
1314                                                 "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size,
1315                                         )
1316                                 },
1317                         )
1318                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1319                                 replies = append(replies, MarshalSP(
1320                                         SPTypeFreq,
1321                                         SPFreq{info.Hash, uint64(offset)},
1322                                 ))
1323                         }
1324
1325                 case SPTypeFile:
1326                         lesp := append(les, LE{"Type", "file"})
1327                         state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string {
1328                                 return fmt.Sprintf(
1329                                         "SP with %s (nice %s): unmarshaling FILE",
1330                                         state.Node.Name, NicenessFmt(state.Nice),
1331                                 )
1332                         })
1333                         var file SPFile
1334                         if _, err = xdr.Unmarshal(r, &file); err != nil {
1335                                 state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string {
1336                                         return fmt.Sprintf(
1337                                                 "SP with %s (nice %s): unmarshaling FILE",
1338                                                 state.Node.Name, NicenessFmt(state.Nice),
1339                                         )
1340                                 })
1341                                 return nil, err
1342                         }
1343                         pktName := Base32Codec.EncodeToString(file.Hash[:])
1344                         lesp = append(
1345                                 lesp,
1346                                 LE{"XX", string(TRx)},
1347                                 LE{"Pkt", pktName},
1348                                 LE{"Size", int64(len(file.Payload))},
1349                         )
1350                         logMsg := func(les LEs) string {
1351                                 return fmt.Sprintf(
1352                                         "Got packet %s (%s)",
1353                                         pktName, humanize.IBytes(uint64(len(file.Payload))),
1354                                 )
1355                         }
1356                         fullsize := int64(0)
1357                         state.RLock()
1358                         infoTheir := state.infosTheir[*file.Hash]
1359                         state.RUnlock()
1360                         if infoTheir == nil {
1361                                 state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1362                                         return logMsg(les) + ": unknown file"
1363                                 })
1364                                 continue
1365                         }
1366                         fullsize = int64(infoTheir.Size)
1367                         lesp = append(lesp, LE{"FullSize", fullsize})
1368                         dirToSync := filepath.Join(
1369                                 state.Ctx.Spool,
1370                                 state.Node.Id.String(),
1371                                 string(TRx),
1372                         )
1373                         filePath := filepath.Join(dirToSync, pktName)
1374                         filePathPart := filePath + PartSuffix
1375                         state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
1376                                 return logMsg(les) + ": opening part"
1377                         })
1378                         state.fdsLock.RLock()
1379                         fdAndFullSize, exists := state.fds[filePathPart]
1380                         state.fdsLock.RUnlock()
1381                         hasherAndOffset := state.fileHashers[filePath]
1382                         var fd *os.File
1383                         if exists {
1384                                 fd = fdAndFullSize.fd
1385                         } else {
1386                                 fd, err = os.OpenFile(
1387                                         filePathPart,
1388                                         os.O_RDWR|os.O_CREATE,
1389                                         os.FileMode(0666),
1390                                 )
1391                                 if err != nil {
1392                                         state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1393                                                 return logMsg(les) + ": opening part"
1394                                         })
1395                                         return nil, err
1396                                 }
1397                                 state.fdsLock.Lock()
1398                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1399                                 state.fdsLock.Unlock()
1400                                 if !state.NoCK {
1401                                         hasherAndOffset = &MTHAndOffset{
1402                                                 mth:    MTHNew(fullsize, int64(file.Offset)),
1403                                                 offset: file.Offset,
1404                                         }
1405                                         state.fileHashers[filePath] = hasherAndOffset
1406                                 }
1407                         }
1408                         state.Ctx.LogD(
1409                                 "sp-file-seek",
1410                                 append(lesp, LE{"Offset", file.Offset}),
1411                                 func(les LEs) string {
1412                                         return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset)
1413                                 })
1414                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1415                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1416                                         return logMsg(les) + ": seeking"
1417                                 })
1418                                 state.closeFd(filePathPart)
1419                                 return nil, err
1420                         }
1421                         state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string {
1422                                 return logMsg(les) + ": writing"
1423                         })
1424                         if _, err = fd.Write(file.Payload); err != nil {
1425                                 state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string {
1426                                         return logMsg(les) + ": writing"
1427                                 })
1428                                 state.closeFd(filePathPart)
1429                                 return nil, err
1430                         }
1431                         if hasherAndOffset != nil {
1432                                 if hasherAndOffset.offset == file.Offset {
1433                                         if _, err = hasherAndOffset.mth.Write(file.Payload); err != nil {
1434                                                 panic(err)
1435                                         }
1436                                         hasherAndOffset.offset += uint64(len(file.Payload))
1437                                 } else {
1438                                         state.Ctx.LogE(
1439                                                 "sp-file-offset-differs", lesp, errors.New("offset differs"),
1440                                                 func(les LEs) string {
1441                                                         return logMsg(les) + ": deleting hasher"
1442                                                 },
1443                                         )
1444                                         delete(state.fileHashers, filePath)
1445                                         hasherAndOffset = nil
1446                                 }
1447                         }
1448                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1449                         lesp[len(lesp)-2].V = ourSize
1450                         if state.Ctx.ShowPrgrs {
1451                                 state.progressBars[pktName] = struct{}{}
1452                                 Progress("Rx", lesp)
1453                         }
1454                         if fullsize != ourSize {
1455                                 continue
1456                         }
1457                         if state.Ctx.ShowPrgrs {
1458                                 delete(state.progressBars, pktName)
1459                         }
1460                         logMsg = func(les LEs) string {
1461                                 return fmt.Sprintf(
1462                                         "Got packet %s %d%% (%s / %s)",
1463                                         pktName, 100*ourSize/fullsize,
1464                                         humanize.IBytes(uint64(ourSize)),
1465                                         humanize.IBytes(uint64(fullsize)),
1466                                 )
1467                         }
1468                         if !NoSync {
1469                                 err = fd.Sync()
1470                                 if err != nil {
1471                                         state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
1472                                                 return logMsg(les) + ": syncing"
1473                                         })
1474                                         state.closeFd(filePathPart)
1475                                         continue
1476                                 }
1477                         }
1478                         if hasherAndOffset != nil {
1479                                 delete(state.fileHashers, filePath)
1480                                 if hasherAndOffset.mth.PreaddSize() == 0 {
1481                                         if !bytes.Equal(hasherAndOffset.mth.Sum(nil), file.Hash[:]) {
1482                                                 state.Ctx.LogE(
1483                                                         "sp-file-bad-checksum", lesp,
1484                                                         errors.New("checksum mismatch"),
1485                                                         logMsg,
1486                                                 )
1487                                                 state.closeFd(filePathPart)
1488                                                 continue
1489                                         }
1490                                         if err = os.Rename(filePathPart, filePath); err != nil {
1491                                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1492                                                         return logMsg(les) + ": renaming"
1493                                                 })
1494                                                 state.closeFd(filePathPart)
1495                                                 continue
1496                                         }
1497                                         if err = DirSync(dirToSync); err != nil {
1498                                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1499                                                         return logMsg(les) + ": dirsyncing"
1500                                                 })
1501                                                 state.closeFd(filePathPart)
1502                                                 continue
1503                                         }
1504                                         state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
1505                                                 return logMsg(les) + ": done"
1506                                         })
1507                                         state.wg.Add(1)
1508                                         go func() {
1509                                                 state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1510                                                 state.wg.Done()
1511                                         }()
1512                                         state.Lock()
1513                                         delete(state.infosTheir, *file.Hash)
1514                                         state.Unlock()
1515                                         if !state.Ctx.HdrUsage {
1516                                                 continue
1517                                         }
1518                                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
1519                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1520                                                         return logMsg(les) + ": seeking"
1521                                                 })
1522                                                 state.closeFd(filePathPart)
1523                                                 continue
1524                                         }
1525                                         _, pktEncRaw, err := state.Ctx.HdrRead(fd)
1526                                         state.closeFd(filePathPart)
1527                                         if err != nil {
1528                                                 state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
1529                                                         return logMsg(les) + ": HdrReading"
1530                                                 })
1531                                                 continue
1532                                         }
1533                                         state.Ctx.HdrWrite(pktEncRaw, filePath)
1534                                         continue
1535                                 }
1536                         }
1537                         state.closeFd(filePathPart)
1538                         if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1539                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1540                                         return logMsg(les) + ": renaming"
1541                                 })
1542                                 continue
1543                         }
1544                         if err = DirSync(dirToSync); err != nil {
1545                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1546                                         return logMsg(les) + ": dirsyncing"
1547                                 })
1548                                 continue
1549                         }
1550                         state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string {
1551                                 return logMsg(les) + ": downloaded"
1552                         })
1553                         state.Lock()
1554                         delete(state.infosTheir, *file.Hash)
1555                         state.Unlock()
1556                         go func() {
1557                                 t := SPCheckerTask{
1558                                         nodeId: state.Node.Id,
1559                                         hsh:    file.Hash,
1560                                         done:   state.payloads,
1561                                 }
1562                                 if hasherAndOffset != nil {
1563                                         t.mth = hasherAndOffset.mth
1564                                 }
1565                                 spCheckerTasks <- t
1566                         }()
1567
1568                 case SPTypeDone:
1569                         lesp := append(les, LE{"Type", "done"})
1570                         state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string {
1571                                 return fmt.Sprintf(
1572                                         "SP with %s (nice %s): unmarshaling DONE",
1573                                         state.Node.Name, NicenessFmt(state.Nice),
1574                                 )
1575                         })
1576                         var done SPDone
1577                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1578                                 state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string {
1579                                         return fmt.Sprintf(
1580                                                 "SP with %s (nice %s): unmarshaling DONE",
1581                                                 state.Node.Name, NicenessFmt(state.Nice),
1582                                         )
1583                                 })
1584                                 return nil, err
1585                         }
1586                         pktName := Base32Codec.EncodeToString(done.Hash[:])
1587                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)})
1588                         logMsg := func(les LEs) string {
1589                                 return fmt.Sprintf(
1590                                         "SP with %s (nice %s): DONE: removing %s",
1591                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1592                                 )
1593                         }
1594                         state.Ctx.LogD("sp-done", lesp, logMsg)
1595                         pth := filepath.Join(
1596                                 state.Ctx.Spool,
1597                                 state.Node.Id.String(),
1598                                 string(TTx),
1599                                 pktName,
1600                         )
1601                         if err = os.Remove(pth); err == nil {
1602                                 state.Ctx.LogI("sp-done", lesp, func(les LEs) string {
1603                                         return fmt.Sprintf("Packet %s is sent", pktName)
1604                                 })
1605                                 if state.Ctx.HdrUsage {
1606                                         os.Remove(JobPath2Hdr(pth))
1607                                 }
1608                         } else {
1609                                 state.Ctx.LogE("sp-done", lesp, err, logMsg)
1610                         }
1611
1612                 case SPTypeFreq:
1613                         lesp := append(les, LE{"Type", "freq"})
1614                         state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string {
1615                                 return fmt.Sprintf(
1616                                         "SP with %s (nice %s): unmarshaling FREQ",
1617                                         state.Node.Name, NicenessFmt(state.Nice),
1618                                 )
1619                         })
1620                         var freq SPFreq
1621                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1622                                 state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string {
1623                                         return fmt.Sprintf(
1624                                                 "SP with %s (nice %s): unmarshaling FREQ",
1625                                                 state.Node.Name, NicenessFmt(state.Nice),
1626                                         )
1627                                 })
1628                                 return nil, err
1629                         }
1630                         pktName := Base32Codec.EncodeToString(freq.Hash[:])
1631                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset})
1632                         state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string {
1633                                 return fmt.Sprintf(
1634                                         "SP with %s (nice %s): FREQ %s: queuing",
1635                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1636                                 )
1637                         })
1638                         nice, exists := state.infosOurSeen[*freq.Hash]
1639                         if exists {
1640                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1641                                         state.Lock()
1642                                         insertIdx := 0
1643                                         var freqWithNice *FreqWithNice
1644                                         for insertIdx, freqWithNice = range state.queueTheir {
1645                                                 if freqWithNice.nice > nice {
1646                                                         break
1647                                                 }
1648                                         }
1649                                         state.queueTheir = append(state.queueTheir, nil)
1650                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1651                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1652                                         state.Unlock()
1653                                 } else {
1654                                         state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string {
1655                                                 return fmt.Sprintf(
1656                                                         "SP with %s (nice %s): FREQ %s: skipping",
1657                                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1658                                                 )
1659                                         })
1660                                 }
1661                         } else {
1662                                 state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string {
1663                                         return fmt.Sprintf(
1664                                                 "SP with %s (nice %s): FREQ %s: unknown",
1665                                                 state.Node.Name, NicenessFmt(state.Nice), pktName,
1666                                         )
1667                                 })
1668                         }
1669
1670                 default:
1671                         state.Ctx.LogE(
1672                                 "sp-process-type-unknown",
1673                                 append(les, LE{"Type", head.Type}),
1674                                 errors.New("unknown type"),
1675                                 func(les LEs) string {
1676                                         return fmt.Sprintf(
1677                                                 "SP with %s (nice %s): %d",
1678                                                 state.Node.Name, NicenessFmt(state.Nice), head.Type,
1679                                         )
1680                                 },
1681                         )
1682                         return nil, BadPktType
1683                 }
1684         }
1685
1686         if infosGot {
1687                 var pkts int
1688                 var size uint64
1689                 state.RLock()
1690                 for _, info := range state.infosTheir {
1691                         pkts++
1692                         size += info.Size
1693                 }
1694                 state.RUnlock()
1695                 state.Ctx.LogI("sp-infos-rx", LEs{
1696                         {"XX", string(TRx)},
1697                         {"Node", state.Node.Id},
1698                         {"Pkts", pkts},
1699                         {"Size", int64(size)},
1700                 }, func(les LEs) string {
1701                         return fmt.Sprintf(
1702                                 "%s has got for us: %d packets, %s",
1703                                 state.Node.Name, pkts, humanize.IBytes(size),
1704                         )
1705                 })
1706         }
1707         return payloadsSplit(replies), nil
1708 }
1709
1710 func SPChecker(ctx *Ctx) {
1711         for t := range spCheckerTasks {
1712                 pktName := Base32Codec.EncodeToString(t.hsh[:])
1713                 les := LEs{
1714                         {"XX", string(TRx)},
1715                         {"Node", t.nodeId},
1716                         {"Pkt", pktName},
1717                 }
1718                 SPCheckerWg.Add(1)
1719                 ctx.LogD("sp-checker", les, func(les LEs) string {
1720                         return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(t.nodeId), pktName)
1721                 })
1722                 size, err := ctx.CheckNoCK(t.nodeId, t.hsh, t.mth)
1723                 les = append(les, LE{"Size", size})
1724                 if err != nil {
1725                         ctx.LogE("sp-checker", les, err, func(les LEs) string {
1726                                 return fmt.Sprintf(
1727                                         "Checksumming %s/rx/%s (%s)", ctx.NodeName(t.nodeId), pktName,
1728                                         humanize.IBytes(uint64(size)),
1729                                 )
1730                         })
1731                         SPCheckerWg.Done()
1732                         continue
1733                 }
1734                 ctx.LogI("sp-checker-done", les, func(les LEs) string {
1735                         return fmt.Sprintf(
1736                                 "Packet %s is retreived (%s)",
1737                                 pktName, humanize.IBytes(uint64(size)),
1738                         )
1739                 })
1740                 SPCheckerWg.Done()
1741                 go func(t SPCheckerTask) {
1742                         defer func() { recover() }()
1743                         t.done <- MarshalSP(SPTypeDone, SPDone{t.hsh})
1744                 }(t)
1745         }
1746 }