]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
aca6f18c1824caf6d5908609b88c22841cb29c59
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2020 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "io"
25         "os"
26         "path/filepath"
27         "sort"
28         "sync"
29         "time"
30
31         xdr "github.com/davecgh/go-xdr/xdr2"
32         "github.com/flynn/noise"
33 )
34
35 const (
36         MaxSPSize      = 1<<16 - 256
37         PartSuffix     = ".part"
38         SPHeadOverhead = 4
39 )
40
41 var (
42         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
43
44         SPInfoOverhead    int
45         SPFreqOverhead    int
46         SPFileOverhead    int
47         SPHaltMarshalized []byte
48         SPPingMarshalized []byte
49
50         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
51                 noise.DH25519,
52                 noise.CipherChaChaPoly,
53                 noise.HashBLAKE2b,
54         )
55
56         DefaultDeadline = 10 * time.Second
57         PingTimeout     = time.Minute
58
59         spWorkersGroup sync.WaitGroup
60 )
61
62 type SPType uint8
63
64 const (
65         SPTypeInfo SPType = iota
66         SPTypeFreq SPType = iota
67         SPTypeFile SPType = iota
68         SPTypeDone SPType = iota
69         SPTypeHalt SPType = iota
70         SPTypePing SPType = iota
71 )
72
73 type SPHead struct {
74         Type SPType
75 }
76
77 type SPInfo struct {
78         Nice uint8
79         Size uint64
80         Hash *[32]byte
81 }
82
83 type SPFreq struct {
84         Hash   *[32]byte
85         Offset uint64
86 }
87
88 type SPFile struct {
89         Hash    *[32]byte
90         Offset  uint64
91         Payload []byte
92 }
93
94 type SPDone struct {
95         Hash *[32]byte
96 }
97
98 type SPRaw struct {
99         Magic   [8]byte
100         Payload []byte
101 }
102
103 type FreqWithNice struct {
104         freq *SPFreq
105         nice uint8
106 }
107
108 type ConnDeadlined interface {
109         io.ReadWriteCloser
110         SetReadDeadline(t time.Time) error
111         SetWriteDeadline(t time.Time) error
112 }
113
114 func init() {
115         var buf bytes.Buffer
116         spHead := SPHead{Type: SPTypeHalt}
117         if _, err := xdr.Marshal(&buf, spHead); err != nil {
118                 panic(err)
119         }
120         SPHaltMarshalized = make([]byte, SPHeadOverhead)
121         copy(SPHaltMarshalized, buf.Bytes())
122         buf.Reset()
123
124         spHead = SPHead{Type: SPTypePing}
125         if _, err := xdr.Marshal(&buf, spHead); err != nil {
126                 panic(err)
127         }
128         SPPingMarshalized = make([]byte, SPHeadOverhead)
129         copy(SPPingMarshalized, buf.Bytes())
130         buf.Reset()
131
132         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
133         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
134                 panic(err)
135         }
136         SPInfoOverhead = buf.Len()
137         buf.Reset()
138
139         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
140         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
141                 panic(err)
142         }
143         SPFreqOverhead = buf.Len()
144         buf.Reset()
145
146         spFile := SPFile{Hash: new([32]byte), Offset: 123}
147         if _, err := xdr.Marshal(&buf, spFile); err != nil {
148                 panic(err)
149         }
150         SPFileOverhead = buf.Len()
151 }
152
153 func MarshalSP(typ SPType, sp interface{}) []byte {
154         var buf bytes.Buffer
155         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
156                 panic(err)
157         }
158         if _, err := xdr.Marshal(&buf, sp); err != nil {
159                 panic(err)
160         }
161         return buf.Bytes()
162 }
163
164 func payloadsSplit(payloads [][]byte) [][]byte {
165         var outbounds [][]byte
166         outbound := make([]byte, 0, MaxSPSize)
167         for i, payload := range payloads {
168                 outbound = append(outbound, payload...)
169                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
170                         outbounds = append(outbounds, outbound)
171                         outbound = make([]byte, 0, MaxSPSize)
172                 }
173         }
174         if len(outbound) > 0 {
175                 outbounds = append(outbounds, outbound)
176         }
177         return outbounds
178 }
179
180 type SPState struct {
181         Ctx            *Ctx
182         Node           *Node
183         Nice           uint8
184         onlineDeadline time.Duration
185         maxOnlineTime  time.Duration
186         hs             *noise.HandshakeState
187         csOur          *noise.CipherState
188         csTheir        *noise.CipherState
189         payloads       chan []byte
190         pings          chan struct{}
191         infosTheir     map[[32]byte]*SPInfo
192         infosOurSeen   map[[32]byte]uint8
193         queueTheir     []*FreqWithNice
194         wg             sync.WaitGroup
195         RxBytes        int64
196         RxLastSeen     time.Time
197         RxLastNonPing  time.Time
198         TxBytes        int64
199         TxLastSeen     time.Time
200         TxLastNonPing  time.Time
201         started        time.Time
202         mustFinishAt   time.Time
203         Duration       time.Duration
204         RxSpeed        int64
205         TxSpeed        int64
206         rxLock         *os.File
207         txLock         *os.File
208         xxOnly         TRxTx
209         rxRate         int
210         txRate         int
211         isDead         chan struct{}
212         listOnly       bool
213         onlyPkts       map[[32]byte]bool
214         writeSPBuf     bytes.Buffer
215         sync.RWMutex
216 }
217
218 func (state *SPState) SetDead() {
219         state.Lock()
220         defer state.Unlock()
221         select {
222         case <-state.isDead:
223                 // Already closed channel, dead
224                 return
225         default:
226         }
227         close(state.isDead)
228         go func() {
229                 for _ = range state.payloads {
230                 }
231         }()
232         go func() {
233                 for _ = range state.pings {
234                 }
235         }()
236 }
237
238 func (state *SPState) NotAlive() bool {
239         select {
240         case <-state.isDead:
241                 return true
242         default:
243         }
244         return false
245 }
246
247 func (state *SPState) dirUnlock() {
248         state.Ctx.UnlockDir(state.rxLock)
249         state.Ctx.UnlockDir(state.txLock)
250 }
251
252 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
253         state.writeSPBuf.Reset()
254         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
255                 Magic:   MagicNNCPLv1,
256                 Payload: payload,
257         })
258         if err != nil {
259                 return err
260         }
261         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
262                 state.TxLastSeen = time.Now()
263                 state.TxBytes += int64(n)
264                 if !ping {
265                         state.TxLastNonPing = state.TxLastSeen
266                 }
267         }
268         return err
269 }
270
271 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
272         var sp SPRaw
273         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
274         if err != nil {
275                 ue := err.(*xdr.UnmarshalError)
276                 if ue.Err == io.EOF {
277                         return nil, ue.Err
278                 }
279                 return nil, err
280         }
281         state.RxLastSeen = time.Now()
282         state.RxBytes += int64(n)
283         if sp.Magic != MagicNNCPLv1 {
284                 return nil, BadMagic
285         }
286         return sp.Payload, nil
287 }
288
289 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
290         var infos []*SPInfo
291         var totalSize int64
292         for job := range ctx.Jobs(nodeId, TTx) {
293                 job.Fd.Close()
294                 if job.PktEnc.Nice > nice {
295                         continue
296                 }
297                 if _, known := (*seen)[*job.HshValue]; known {
298                         continue
299                 }
300                 totalSize += job.Size
301                 infos = append(infos, &SPInfo{
302                         Nice: job.PktEnc.Nice,
303                         Size: uint64(job.Size),
304                         Hash: job.HshValue,
305                 })
306                 (*seen)[*job.HshValue] = job.PktEnc.Nice
307         }
308         sort.Sort(ByNice(infos))
309         var payloads [][]byte
310         for _, info := range infos {
311                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
312                 ctx.LogD("sp-info-our", SDS{
313                         "node": nodeId,
314                         "name": ToBase32(info.Hash[:]),
315                         "size": info.Size,
316                 }, "")
317         }
318         if totalSize > 0 {
319                 ctx.LogI("sp-infos", SDS{
320                         "xx":   string(TTx),
321                         "node": nodeId,
322                         "pkts": len(payloads),
323                         "size": totalSize,
324                 }, "")
325         }
326         return payloadsSplit(payloads)
327 }
328
329 func (state *SPState) StartI(conn ConnDeadlined) error {
330         nodeId := state.Node.Id
331         err := state.Ctx.ensureRxDir(nodeId)
332         if err != nil {
333                 return err
334         }
335         var rxLock *os.File
336         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
337                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
338                 if err != nil {
339                         return err
340                 }
341         }
342         var txLock *os.File
343         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
344                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
345                 if err != nil {
346                         return err
347                 }
348         }
349         started := time.Now()
350         conf := noise.Config{
351                 CipherSuite: NoiseCipherSuite,
352                 Pattern:     noise.HandshakeIK,
353                 Initiator:   true,
354                 StaticKeypair: noise.DHKey{
355                         Private: state.Ctx.Self.NoisePrv[:],
356                         Public:  state.Ctx.Self.NoisePub[:],
357                 },
358                 PeerStatic: state.Node.NoisePub[:],
359         }
360         hs, err := noise.NewHandshakeState(conf)
361         if err != nil {
362                 return err
363         }
364         state.hs = hs
365         state.payloads = make(chan []byte)
366         state.pings = make(chan struct{})
367         state.infosTheir = make(map[[32]byte]*SPInfo)
368         state.infosOurSeen = make(map[[32]byte]uint8)
369         state.started = started
370         state.rxLock = rxLock
371         state.txLock = txLock
372
373         var infosPayloads [][]byte
374         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
375                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
376         }
377         var firstPayload []byte
378         if len(infosPayloads) > 0 {
379                 firstPayload = infosPayloads[0]
380         }
381         // Pad first payload, to hide actual number of existing files
382         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
383                 firstPayload = append(firstPayload, SPHaltMarshalized...)
384         }
385
386         var buf []byte
387         var payload []byte
388         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
389         if err != nil {
390                 state.dirUnlock()
391                 return err
392         }
393         sds := SDS{"node": nodeId, "nice": int(state.Nice)}
394         state.Ctx.LogD("sp-start", sds, "sending first message")
395         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
396         if err = state.WriteSP(conn, buf, false); err != nil {
397                 state.Ctx.LogE("sp-start", sds, err, "")
398                 state.dirUnlock()
399                 return err
400         }
401         state.Ctx.LogD("sp-start", sds, "waiting for first message")
402         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
403         if buf, err = state.ReadSP(conn); err != nil {
404                 state.Ctx.LogE("sp-start", sds, err, "")
405                 state.dirUnlock()
406                 return err
407         }
408         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
409         if err != nil {
410                 state.Ctx.LogE("sp-start", sds, err, "")
411                 state.dirUnlock()
412                 return err
413         }
414         state.Ctx.LogD("sp-start", sds, "starting workers")
415         err = state.StartWorkers(conn, infosPayloads, payload)
416         if err != nil {
417                 state.Ctx.LogE("sp-start", sds, err, "")
418                 state.dirUnlock()
419         }
420         return err
421 }
422
423 func (state *SPState) StartR(conn ConnDeadlined) error {
424         started := time.Now()
425         conf := noise.Config{
426                 CipherSuite: NoiseCipherSuite,
427                 Pattern:     noise.HandshakeIK,
428                 Initiator:   false,
429                 StaticKeypair: noise.DHKey{
430                         Private: state.Ctx.Self.NoisePrv[:],
431                         Public:  state.Ctx.Self.NoisePub[:],
432                 },
433         }
434         hs, err := noise.NewHandshakeState(conf)
435         if err != nil {
436                 return err
437         }
438         xxOnly := TRxTx("")
439         state.hs = hs
440         state.payloads = make(chan []byte)
441         state.pings = make(chan struct{})
442         state.infosOurSeen = make(map[[32]byte]uint8)
443         state.infosTheir = make(map[[32]byte]*SPInfo)
444         state.started = started
445         state.xxOnly = xxOnly
446         var buf []byte
447         var payload []byte
448         state.Ctx.LogD("sp-start", SDS{"nice": int(state.Nice)}, "waiting for first message")
449         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
450         if buf, err = state.ReadSP(conn); err != nil {
451                 state.Ctx.LogE("sp-start", SDS{}, err, "")
452                 return err
453         }
454         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
455                 state.Ctx.LogE("sp-start", SDS{}, err, "")
456                 return err
457         }
458
459         var node *Node
460         for _, n := range state.Ctx.Neigh {
461                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
462                         node = n
463                         break
464                 }
465         }
466         if node == nil {
467                 peerId := ToBase32(state.hs.PeerStatic())
468                 state.Ctx.LogE("sp-start", SDS{"peer": peerId}, errors.New("unknown"), "")
469                 return errors.New("Unknown peer: " + peerId)
470         }
471         state.Node = node
472         state.rxRate = node.RxRate
473         state.txRate = node.TxRate
474         state.onlineDeadline = node.OnlineDeadline
475         state.maxOnlineTime = node.MaxOnlineTime
476         sds := SDS{"node": node.Id, "nice": int(state.Nice)}
477
478         if state.Ctx.ensureRxDir(node.Id); err != nil {
479                 return err
480         }
481         var rxLock *os.File
482         if xxOnly == "" || xxOnly == TRx {
483                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
484                 if err != nil {
485                         return err
486                 }
487         }
488         state.rxLock = rxLock
489         var txLock *os.File
490         if xxOnly == "" || xxOnly == TTx {
491                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
492                 if err != nil {
493                         return err
494                 }
495         }
496         state.txLock = txLock
497
498         var infosPayloads [][]byte
499         if xxOnly == "" || xxOnly == TTx {
500                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
501         }
502         var firstPayload []byte
503         if len(infosPayloads) > 0 {
504                 firstPayload = infosPayloads[0]
505         }
506         // Pad first payload, to hide actual number of existing files
507         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
508                 firstPayload = append(firstPayload, SPHaltMarshalized...)
509         }
510
511         state.Ctx.LogD("sp-start", sds, "sending first message")
512         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
513         if err != nil {
514                 state.dirUnlock()
515                 return err
516         }
517         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
518         if err = state.WriteSP(conn, buf, false); err != nil {
519                 state.Ctx.LogE("sp-start", sds, err, "")
520                 state.dirUnlock()
521                 return err
522         }
523         state.Ctx.LogD("sp-start", sds, "starting workers")
524         err = state.StartWorkers(conn, infosPayloads, payload)
525         if err != nil {
526                 state.dirUnlock()
527         }
528         return err
529 }
530
531 func (state *SPState) StartWorkers(
532         conn ConnDeadlined,
533         infosPayloads [][]byte,
534         payload []byte,
535 ) error {
536         sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
537         state.isDead = make(chan struct{})
538         if state.maxOnlineTime > 0 {
539                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
540         }
541
542         // Remaining handshake payload sending
543         if len(infosPayloads) > 1 {
544                 state.wg.Add(1)
545                 go func() {
546                         for _, payload := range infosPayloads[1:] {
547                                 state.Ctx.LogD(
548                                         "sp-work",
549                                         SdsAdd(sds, SDS{"size": len(payload)}),
550                                         "queuing remaining payload",
551                                 )
552                                 state.payloads <- payload
553                         }
554                         state.wg.Done()
555                 }()
556         }
557
558         // Processing of first payload and queueing its responses
559         state.Ctx.LogD(
560                 "sp-work",
561                 SdsAdd(sds, SDS{"size": len(payload)}),
562                 "processing first payload",
563         )
564         replies, err := state.ProcessSP(payload)
565         if err != nil {
566                 state.Ctx.LogE("sp-work", sds, err, "")
567                 return err
568         }
569         state.wg.Add(1)
570         go func() {
571                 for _, reply := range replies {
572                         state.Ctx.LogD(
573                                 "sp-work",
574                                 SdsAdd(sds, SDS{"size": len(reply)}),
575                                 "queuing reply",
576                         )
577                         state.payloads <- reply
578                 }
579                 state.wg.Done()
580         }()
581
582         // Periodic jobs
583         state.wg.Add(1)
584         go func() {
585                 deadlineTicker := time.NewTicker(time.Second)
586                 pingTicker := time.NewTicker(PingTimeout)
587                 for {
588                         select {
589                         case <-state.isDead:
590                                 state.wg.Done()
591                                 deadlineTicker.Stop()
592                                 pingTicker.Stop()
593                                 return
594                         case now := <-deadlineTicker.C:
595                                 if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
596                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
597                                         (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
598                                         (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
599                                         state.SetDead()
600                                         conn.Close()
601                                 }
602                         case now := <-pingTicker.C:
603                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
604                                         state.wg.Add(1)
605                                         go func() {
606                                                 state.pings <- struct{}{}
607                                                 state.wg.Done()
608                                         }()
609                                 }
610                         }
611                 }
612         }()
613
614         // Spool checker and INFOs sender of appearing files
615         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
616                 state.wg.Add(1)
617                 go func() {
618                         ticker := time.NewTicker(time.Second)
619                         for {
620                                 select {
621                                 case <-state.isDead:
622                                         state.wg.Done()
623                                         ticker.Stop()
624                                         return
625                                 case <-ticker.C:
626                                         for _, payload := range state.Ctx.infosOur(
627                                                 state.Node.Id,
628                                                 state.Nice,
629                                                 &state.infosOurSeen,
630                                         ) {
631                                                 state.Ctx.LogD(
632                                                         "sp-work",
633                                                         SdsAdd(sds, SDS{"size": len(payload)}),
634                                                         "queuing new info",
635                                                 )
636                                                 state.payloads <- payload
637                                         }
638                                 }
639                         }
640                 }()
641         }
642
643         // Sender
644         state.wg.Add(1)
645         go func() {
646                 defer conn.Close()
647                 defer state.SetDead()
648                 defer state.wg.Done()
649                 for {
650                         if state.NotAlive() {
651                                 return
652                         }
653                         var payload []byte
654                         var ping bool
655                         select {
656                         case <-state.pings:
657                                 state.Ctx.LogD("sp-xmit", sds, "got ping")
658                                 payload = SPPingMarshalized
659                                 ping = true
660                         case payload = <-state.payloads:
661                                 state.Ctx.LogD(
662                                         "sp-xmit",
663                                         SdsAdd(sds, SDS{"size": len(payload)}),
664                                         "got payload",
665                                 )
666                         default:
667                                 state.RLock()
668                                 if len(state.queueTheir) == 0 {
669                                         state.RUnlock()
670                                         time.Sleep(100 * time.Millisecond)
671                                         continue
672                                 }
673                                 freq := state.queueTheir[0].freq
674                                 state.RUnlock()
675                                 if state.txRate > 0 {
676                                         time.Sleep(time.Second / time.Duration(state.txRate))
677                                 }
678                                 sdsp := SdsAdd(sds, SDS{
679                                         "xx":   string(TTx),
680                                         "pkt":  ToBase32(freq.Hash[:]),
681                                         "size": int64(freq.Offset),
682                                 })
683                                 state.Ctx.LogD("sp-file", sdsp, "queueing")
684                                 fd, err := os.Open(filepath.Join(
685                                         state.Ctx.Spool,
686                                         state.Node.Id.String(),
687                                         string(TTx),
688                                         ToBase32(freq.Hash[:]),
689                                 ))
690                                 if err != nil {
691                                         state.Ctx.LogE("sp-file", sdsp, err, "")
692                                         return
693                                 }
694                                 fi, err := fd.Stat()
695                                 if err != nil {
696                                         state.Ctx.LogE("sp-file", sdsp, err, "")
697                                         return
698                                 }
699                                 fullSize := fi.Size()
700                                 var buf []byte
701                                 if freq.Offset < uint64(fullSize) {
702                                         state.Ctx.LogD("sp-file", sdsp, "seeking")
703                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
704                                                 state.Ctx.LogE("sp-file", sdsp, err, "")
705                                                 return
706                                         }
707                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
708                                         n, err := fd.Read(buf)
709                                         if err != nil {
710                                                 state.Ctx.LogE("sp-file", sdsp, err, "")
711                                                 return
712                                         }
713                                         buf = buf[:n]
714                                         state.Ctx.LogD("sp-file", SdsAdd(sdsp, SDS{"size": n}), "read")
715                                 }
716                                 fd.Close()
717                                 payload = MarshalSP(SPTypeFile, SPFile{
718                                         Hash:    freq.Hash,
719                                         Offset:  freq.Offset,
720                                         Payload: buf,
721                                 })
722                                 ourSize := freq.Offset + uint64(len(buf))
723                                 sdsp["size"] = int64(ourSize)
724                                 sdsp["fullsize"] = fullSize
725                                 if state.Ctx.ShowPrgrs {
726                                         Progress("Tx", sdsp)
727                                 }
728                                 state.Lock()
729                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
730                                         if ourSize == uint64(fullSize) {
731                                                 state.Ctx.LogD("sp-file", sdsp, "finished")
732                                                 if len(state.queueTheir) > 1 {
733                                                         state.queueTheir = state.queueTheir[1:]
734                                                 } else {
735                                                         state.queueTheir = state.queueTheir[:0]
736                                                 }
737                                         } else {
738                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
739                                         }
740                                 } else {
741                                         state.Ctx.LogD("sp-file", sdsp, "queue disappeared")
742                                 }
743                                 state.Unlock()
744                         }
745                         state.Ctx.LogD("sp-xmit", SdsAdd(sds, SDS{"size": len(payload)}), "sending")
746                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
747                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
748                                 state.Ctx.LogE("sp-xmit", sds, err, "")
749                                 return
750                         }
751                 }
752         }()
753
754         // Receiver
755         state.wg.Add(1)
756         go func() {
757                 for {
758                         if state.NotAlive() {
759                                 break
760                         }
761                         state.Ctx.LogD("sp-recv", sds, "waiting for payload")
762                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
763                         payload, err := state.ReadSP(conn)
764                         if err != nil {
765                                 if err == io.EOF {
766                                         break
767                                 }
768                                 unmarshalErr := err.(*xdr.UnmarshalError)
769                                 if os.IsTimeout(unmarshalErr.Err) {
770                                         continue
771                                 }
772                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
773                                         break
774                                 }
775                                 state.Ctx.LogE("sp-recv", sds, err, "")
776                                 break
777                         }
778                         state.Ctx.LogD(
779                                 "sp-recv",
780                                 SdsAdd(sds, SDS{"size": len(payload)}),
781                                 "got payload",
782                         )
783                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
784                         if err != nil {
785                                 state.Ctx.LogE("sp-recv", sds, err, "")
786                                 break
787                         }
788                         state.Ctx.LogD(
789                                 "sp-recv",
790                                 SdsAdd(sds, SDS{"size": len(payload)}),
791                                 "processing",
792                         )
793                         replies, err := state.ProcessSP(payload)
794                         if err != nil {
795                                 state.Ctx.LogE("sp-recv", sds, err, "")
796                                 break
797                         }
798                         state.wg.Add(1)
799                         go func() {
800                                 for _, reply := range replies {
801                                         state.Ctx.LogD(
802                                                 "sp-recv",
803                                                 SdsAdd(sds, SDS{"size": len(reply)}),
804                                                 "queuing reply",
805                                         )
806                                         state.payloads <- reply
807                                 }
808                                 state.wg.Done()
809                         }()
810                         if state.rxRate > 0 {
811                                 time.Sleep(time.Second / time.Duration(state.rxRate))
812                         }
813                 }
814                 state.SetDead()
815                 state.wg.Done()
816                 state.SetDead()
817                 conn.Close()
818         }()
819
820         return nil
821 }
822
823 func (state *SPState) Wait() {
824         state.wg.Wait()
825         close(state.payloads)
826         close(state.pings)
827         state.dirUnlock()
828         state.Duration = time.Now().Sub(state.started)
829         state.RxSpeed = state.RxBytes
830         state.TxSpeed = state.TxBytes
831         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
832         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
833         if rxDuration > 0 {
834                 state.RxSpeed = state.RxBytes / rxDuration
835         }
836         if txDuration > 0 {
837                 state.TxSpeed = state.TxBytes / txDuration
838         }
839 }
840
841 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
842         sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
843         r := bytes.NewReader(payload)
844         var err error
845         var replies [][]byte
846         var infosGot bool
847         for r.Len() > 0 {
848                 state.Ctx.LogD("sp-process", sds, "unmarshaling header")
849                 var head SPHead
850                 if _, err = xdr.Unmarshal(r, &head); err != nil {
851                         state.Ctx.LogE("sp-process", sds, err, "")
852                         return nil, err
853                 }
854                 switch head.Type {
855                 case SPTypeHalt:
856                         state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "")
857                         state.Lock()
858                         state.queueTheir = nil
859                         state.Unlock()
860                 case SPTypePing:
861                         state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "ping"}), "")
862                 case SPTypeInfo:
863                         infosGot = true
864                         sdsp := SdsAdd(sds, SDS{"type": "info"})
865                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
866                         var info SPInfo
867                         if _, err = xdr.Unmarshal(r, &info); err != nil {
868                                 state.Ctx.LogE("sp-process", sdsp, err, "")
869                                 return nil, err
870                         }
871                         sdsp = SdsAdd(sds, SDS{
872                                 "pkt":  ToBase32(info.Hash[:]),
873                                 "size": int64(info.Size),
874                                 "nice": int(info.Nice),
875                         })
876                         if !state.listOnly && info.Nice > state.Nice {
877                                 state.Ctx.LogD("sp-process", sdsp, "too nice")
878                                 continue
879                         }
880                         state.Ctx.LogD("sp-process", sdsp, "received")
881                         if !state.listOnly && state.xxOnly == TTx {
882                                 continue
883                         }
884                         state.Lock()
885                         state.infosTheir[*info.Hash] = &info
886                         state.Unlock()
887                         state.Ctx.LogD("sp-process", sdsp, "stating part")
888                         pktPath := filepath.Join(
889                                 state.Ctx.Spool,
890                                 state.Node.Id.String(),
891                                 string(TRx),
892                                 ToBase32(info.Hash[:]),
893                         )
894                         if _, err = os.Stat(pktPath); err == nil {
895                                 state.Ctx.LogI("sp-info", sdsp, "already done")
896                                 if !state.listOnly {
897                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
898                                 }
899                                 continue
900                         }
901                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
902                                 state.Ctx.LogI("sp-info", sdsp, "already seen")
903                                 if !state.listOnly {
904                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
905                                 }
906                                 continue
907                         }
908                         fi, err := os.Stat(pktPath + PartSuffix)
909                         var offset int64
910                         if err == nil {
911                                 offset = fi.Size()
912                         }
913                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
914                                 state.Ctx.LogI("sp-info", sdsp, "not enough space")
915                                 continue
916                         }
917                         state.Ctx.LogI(
918                                 "sp-info",
919                                 SdsAdd(sdsp, SDS{"offset": offset}),
920                                 "",
921                         )
922                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
923                                 replies = append(replies, MarshalSP(
924                                         SPTypeFreq,
925                                         SPFreq{info.Hash, uint64(offset)},
926                                 ))
927                         }
928                 case SPTypeFile:
929                         sdsp := SdsAdd(sds, SDS{"type": "file"})
930                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
931                         var file SPFile
932                         if _, err = xdr.Unmarshal(r, &file); err != nil {
933                                 state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "file"}), err, "")
934                                 return nil, err
935                         }
936                         sdsp["xx"] = string(TRx)
937                         sdsp["pkt"] = ToBase32(file.Hash[:])
938                         sdsp["size"] = len(file.Payload)
939                         dirToSync := filepath.Join(
940                                 state.Ctx.Spool,
941                                 state.Node.Id.String(),
942                                 string(TRx),
943                         )
944                         filePath := filepath.Join(dirToSync, ToBase32(file.Hash[:]))
945                         state.Ctx.LogD("sp-file", sdsp, "opening part")
946                         fd, err := os.OpenFile(
947                                 filePath+PartSuffix,
948                                 os.O_RDWR|os.O_CREATE,
949                                 os.FileMode(0666),
950                         )
951                         if err != nil {
952                                 state.Ctx.LogE("sp-file", sdsp, err, "")
953                                 return nil, err
954                         }
955                         state.Ctx.LogD(
956                                 "sp-file",
957                                 SdsAdd(sdsp, SDS{"offset": file.Offset}),
958                                 "seeking",
959                         )
960                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
961                                 state.Ctx.LogE("sp-file", sdsp, err, "")
962                                 fd.Close()
963                                 return nil, err
964                         }
965                         state.Ctx.LogD("sp-file", sdsp, "writing")
966                         _, err = fd.Write(file.Payload)
967                         if err != nil {
968                                 state.Ctx.LogE("sp-file", sdsp, err, "")
969                                 fd.Close()
970                                 return nil, err
971                         }
972                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
973                         sdsp["size"] = ourSize
974                         fullsize := int64(0)
975                         state.RLock()
976                         infoTheir, ok := state.infosTheir[*file.Hash]
977                         state.RUnlock()
978                         if ok {
979                                 fullsize = int64(infoTheir.Size)
980                         }
981                         sdsp["fullsize"] = fullsize
982                         if state.Ctx.ShowPrgrs {
983                                 Progress("Rx", sdsp)
984                         }
985                         if fullsize != ourSize {
986                                 fd.Close()
987                                 continue
988                         }
989                         spWorkersGroup.Wait()
990                         spWorkersGroup.Add(1)
991                         go func() {
992                                 if err := fd.Sync(); err != nil {
993                                         state.Ctx.LogE("sp-file", sdsp, err, "sync")
994                                         fd.Close()
995                                         return
996                                 }
997                                 state.wg.Add(1)
998                                 defer state.wg.Done()
999                                 fd.Seek(0, io.SeekStart)
1000                                 state.Ctx.LogD("sp-file", sdsp, "checking")
1001                                 gut, err := Check(fd, file.Hash[:], sdsp, state.Ctx.ShowPrgrs)
1002                                 fd.Close()
1003                                 if err != nil || !gut {
1004                                         state.Ctx.LogE("sp-file", sdsp, errors.New("checksum mismatch"), "")
1005                                         return
1006                                 }
1007                                 state.Ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
1008                                 if err = os.Rename(filePath+PartSuffix, filePath); err != nil {
1009                                         state.Ctx.LogE("sp-file", sdsp, err, "rename")
1010                                         return
1011                                 }
1012                                 if err = DirSync(dirToSync); err != nil {
1013                                         state.Ctx.LogE("sp-file", sdsp, err, "sync")
1014                                         return
1015                                 }
1016                                 state.Lock()
1017                                 delete(state.infosTheir, *file.Hash)
1018                                 state.Unlock()
1019                                 spWorkersGroup.Done()
1020                                 state.wg.Add(1)
1021                                 go func() {
1022                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1023                                         state.wg.Done()
1024                                 }()
1025                         }()
1026                 case SPTypeDone:
1027                         sdsp := SdsAdd(sds, SDS{"type": "done"})
1028                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
1029                         var done SPDone
1030                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1031                                 state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "done"}), err, "")
1032                                 return nil, err
1033                         }
1034                         sdsp["pkt"] = ToBase32(done.Hash[:])
1035                         state.Ctx.LogD("sp-done", sdsp, "removing")
1036                         err := os.Remove(filepath.Join(
1037                                 state.Ctx.Spool,
1038                                 state.Node.Id.String(),
1039                                 string(TTx),
1040                                 ToBase32(done.Hash[:]),
1041                         ))
1042                         sdsp["xx"] = string(TTx)
1043                         if err == nil {
1044                                 state.Ctx.LogI("sp-done", sdsp, "")
1045                         } else {
1046                                 state.Ctx.LogE("sp-done", sdsp, err, "")
1047                         }
1048                 case SPTypeFreq:
1049                         sdsp := SdsAdd(sds, SDS{"type": "freq"})
1050                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
1051                         var freq SPFreq
1052                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1053                                 state.Ctx.LogE("sp-process", sdsp, err, "")
1054                                 return nil, err
1055                         }
1056                         sdsp["pkt"] = ToBase32(freq.Hash[:])
1057                         sdsp["offset"] = freq.Offset
1058                         state.Ctx.LogD("sp-process", sdsp, "queueing")
1059                         nice, exists := state.infosOurSeen[*freq.Hash]
1060                         if exists {
1061                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1062                                         state.Lock()
1063                                         insertIdx := 0
1064                                         var freqWithNice *FreqWithNice
1065                                         for insertIdx, freqWithNice = range state.queueTheir {
1066                                                 if freqWithNice.nice > nice {
1067                                                         break
1068                                                 }
1069                                         }
1070                                         state.queueTheir = append(state.queueTheir, nil)
1071                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1072                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1073                                         state.Unlock()
1074                                 } else {
1075                                         state.Ctx.LogD("sp-process", sdsp, "skipping")
1076                                 }
1077                         } else {
1078                                 state.Ctx.LogD("sp-process", sdsp, "unknown")
1079                         }
1080                 default:
1081                         state.Ctx.LogE(
1082                                 "sp-process",
1083                                 SdsAdd(sds, SDS{"type": head.Type}),
1084                                 errors.New("unknown type"),
1085                                 "",
1086                         )
1087                         return nil, BadPktType
1088                 }
1089                 if head.Type != SPTypePing {
1090                         state.RxLastNonPing = state.RxLastSeen
1091                 }
1092         }
1093         if infosGot {
1094                 var pkts int
1095                 var size uint64
1096                 state.RLock()
1097                 for _, info := range state.infosTheir {
1098                         pkts++
1099                         size += info.Size
1100                 }
1101                 state.RUnlock()
1102                 state.Ctx.LogI("sp-infos", SDS{
1103                         "xx":   string(TRx),
1104                         "node": state.Node.Id,
1105                         "pkts": pkts,
1106                         "size": int64(size),
1107                 }, "")
1108         }
1109         return payloadsSplit(replies), nil
1110 }