]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
4f522c9b52ac1512fd70941090f430d88b0b1f43
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2020 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "io"
25         "net"
26         "os"
27         "path/filepath"
28         "sort"
29         "sync"
30         "time"
31
32         xdr "github.com/davecgh/go-xdr/xdr2"
33         "github.com/flynn/noise"
34 )
35
36 const (
37         MaxSPSize      = 1<<16 - 256
38         PartSuffix     = ".part"
39         SPHeadOverhead = 4
40 )
41
42 var (
43         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
44
45         SPInfoOverhead    int
46         SPFreqOverhead    int
47         SPFileOverhead    int
48         SPHaltMarshalized []byte
49         SPPingMarshalized []byte
50
51         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
52                 noise.DH25519,
53                 noise.CipherChaChaPoly,
54                 noise.HashBLAKE2b,
55         )
56
57         DefaultDeadline = 10 * time.Second
58         PingTimeout     = time.Minute
59
60         spWorkersGroup sync.WaitGroup
61 )
62
63 type SPType uint8
64
65 const (
66         SPTypeInfo SPType = iota
67         SPTypeFreq SPType = iota
68         SPTypeFile SPType = iota
69         SPTypeDone SPType = iota
70         SPTypeHalt SPType = iota
71         SPTypePing SPType = iota
72 )
73
74 type SPHead struct {
75         Type SPType
76 }
77
78 type SPInfo struct {
79         Nice uint8
80         Size uint64
81         Hash *[32]byte
82 }
83
84 type SPFreq struct {
85         Hash   *[32]byte
86         Offset uint64
87 }
88
89 type SPFile struct {
90         Hash    *[32]byte
91         Offset  uint64
92         Payload []byte
93 }
94
95 type SPDone struct {
96         Hash *[32]byte
97 }
98
99 type SPRaw struct {
100         Magic   [8]byte
101         Payload []byte
102 }
103
104 type FreqWithNice struct {
105         freq *SPFreq
106         nice uint8
107 }
108
109 type ConnDeadlined interface {
110         io.ReadWriteCloser
111         SetReadDeadline(t time.Time) error
112         SetWriteDeadline(t time.Time) error
113 }
114
115 func init() {
116         var buf bytes.Buffer
117         spHead := SPHead{Type: SPTypeHalt}
118         if _, err := xdr.Marshal(&buf, spHead); err != nil {
119                 panic(err)
120         }
121         SPHaltMarshalized = make([]byte, SPHeadOverhead)
122         copy(SPHaltMarshalized, buf.Bytes())
123         buf.Reset()
124
125         spHead = SPHead{Type: SPTypePing}
126         if _, err := xdr.Marshal(&buf, spHead); err != nil {
127                 panic(err)
128         }
129         SPPingMarshalized = make([]byte, SPHeadOverhead)
130         copy(SPPingMarshalized, buf.Bytes())
131         buf.Reset()
132
133         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
134         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
135                 panic(err)
136         }
137         SPInfoOverhead = buf.Len()
138         buf.Reset()
139
140         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
141         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
142                 panic(err)
143         }
144         SPFreqOverhead = buf.Len()
145         buf.Reset()
146
147         spFile := SPFile{Hash: new([32]byte), Offset: 123}
148         if _, err := xdr.Marshal(&buf, spFile); err != nil {
149                 panic(err)
150         }
151         SPFileOverhead = buf.Len()
152 }
153
154 func MarshalSP(typ SPType, sp interface{}) []byte {
155         var buf bytes.Buffer
156         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
157                 panic(err)
158         }
159         if _, err := xdr.Marshal(&buf, sp); err != nil {
160                 panic(err)
161         }
162         return buf.Bytes()
163 }
164
165 func payloadsSplit(payloads [][]byte) [][]byte {
166         var outbounds [][]byte
167         outbound := make([]byte, 0, MaxSPSize)
168         for i, payload := range payloads {
169                 outbound = append(outbound, payload...)
170                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
171                         outbounds = append(outbounds, outbound)
172                         outbound = make([]byte, 0, MaxSPSize)
173                 }
174         }
175         if len(outbound) > 0 {
176                 outbounds = append(outbounds, outbound)
177         }
178         return outbounds
179 }
180
181 type SPState struct {
182         Ctx            *Ctx
183         Node           *Node
184         Nice           uint8
185         onlineDeadline time.Duration
186         maxOnlineTime  time.Duration
187         hs             *noise.HandshakeState
188         csOur          *noise.CipherState
189         csTheir        *noise.CipherState
190         payloads       chan []byte
191         pings          chan struct{}
192         infosTheir     map[[32]byte]*SPInfo
193         infosOurSeen   map[[32]byte]uint8
194         queueTheir     []*FreqWithNice
195         wg             sync.WaitGroup
196         RxBytes        int64
197         RxLastSeen     time.Time
198         RxLastNonPing  time.Time
199         TxBytes        int64
200         TxLastSeen     time.Time
201         TxLastNonPing  time.Time
202         started        time.Time
203         mustFinishAt   time.Time
204         Duration       time.Duration
205         RxSpeed        int64
206         TxSpeed        int64
207         rxLock         *os.File
208         txLock         *os.File
209         xxOnly         TRxTx
210         rxRate         int
211         txRate         int
212         isDead         chan struct{}
213         listOnly       bool
214         onlyPkts       map[[32]byte]bool
215         writeSPBuf     bytes.Buffer
216         sync.RWMutex
217 }
218
219 func (state *SPState) SetDead() {
220         state.Lock()
221         defer state.Unlock()
222         select {
223         case <-state.isDead:
224                 // Already closed channel, dead
225                 return
226         default:
227         }
228         close(state.isDead)
229         go func() {
230                 for _ = range state.payloads {
231                 }
232         }()
233         go func() {
234                 for _ = range state.pings {
235                 }
236         }()
237 }
238
239 func (state *SPState) NotAlive() bool {
240         select {
241         case <-state.isDead:
242                 return true
243         default:
244         }
245         return false
246 }
247
248 func (state *SPState) dirUnlock() {
249         state.Ctx.UnlockDir(state.rxLock)
250         state.Ctx.UnlockDir(state.txLock)
251 }
252
253 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
254         state.writeSPBuf.Reset()
255         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
256                 Magic:   MagicNNCPLv1,
257                 Payload: payload,
258         })
259         if err != nil {
260                 return err
261         }
262         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
263                 state.TxLastSeen = time.Now()
264                 state.TxBytes += int64(n)
265                 if !ping {
266                         state.TxLastNonPing = state.TxLastSeen
267                 }
268         }
269         return err
270 }
271
272 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
273         var sp SPRaw
274         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
275         if err != nil {
276                 ue := err.(*xdr.UnmarshalError)
277                 if ue.Err == io.EOF {
278                         return nil, ue.Err
279                 }
280                 return nil, err
281         }
282         state.RxLastSeen = time.Now()
283         state.RxBytes += int64(n)
284         if sp.Magic != MagicNNCPLv1 {
285                 return nil, BadMagic
286         }
287         return sp.Payload, nil
288 }
289
290 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
291         var infos []*SPInfo
292         var totalSize int64
293         for job := range ctx.Jobs(nodeId, TTx) {
294                 job.Fd.Close()
295                 if job.PktEnc.Nice > nice {
296                         continue
297                 }
298                 if _, known := (*seen)[*job.HshValue]; known {
299                         continue
300                 }
301                 totalSize += job.Size
302                 infos = append(infos, &SPInfo{
303                         Nice: job.PktEnc.Nice,
304                         Size: uint64(job.Size),
305                         Hash: job.HshValue,
306                 })
307                 (*seen)[*job.HshValue] = job.PktEnc.Nice
308         }
309         sort.Sort(ByNice(infos))
310         var payloads [][]byte
311         for _, info := range infos {
312                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
313                 ctx.LogD("sp-info-our", SDS{
314                         "node": nodeId,
315                         "name": ToBase32(info.Hash[:]),
316                         "size": info.Size,
317                 }, "")
318         }
319         if totalSize > 0 {
320                 ctx.LogI("sp-infos", SDS{
321                         "xx":   string(TTx),
322                         "node": nodeId,
323                         "pkts": len(payloads),
324                         "size": totalSize,
325                 }, "")
326         }
327         return payloadsSplit(payloads)
328 }
329
330 func (state *SPState) StartI(conn ConnDeadlined) error {
331         nodeId := state.Node.Id
332         err := state.Ctx.ensureRxDir(nodeId)
333         if err != nil {
334                 return err
335         }
336         var rxLock *os.File
337         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
338                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
339                 if err != nil {
340                         return err
341                 }
342         }
343         var txLock *os.File
344         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
345                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
346                 if err != nil {
347                         return err
348                 }
349         }
350         started := time.Now()
351         conf := noise.Config{
352                 CipherSuite: NoiseCipherSuite,
353                 Pattern:     noise.HandshakeIK,
354                 Initiator:   true,
355                 StaticKeypair: noise.DHKey{
356                         Private: state.Ctx.Self.NoisePrv[:],
357                         Public:  state.Ctx.Self.NoisePub[:],
358                 },
359                 PeerStatic: state.Node.NoisePub[:],
360         }
361         hs, err := noise.NewHandshakeState(conf)
362         if err != nil {
363                 return err
364         }
365         state.hs = hs
366         state.payloads = make(chan []byte)
367         state.pings = make(chan struct{})
368         state.infosTheir = make(map[[32]byte]*SPInfo)
369         state.infosOurSeen = make(map[[32]byte]uint8)
370         state.started = started
371         state.rxLock = rxLock
372         state.txLock = txLock
373
374         var infosPayloads [][]byte
375         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
376                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
377         }
378         var firstPayload []byte
379         if len(infosPayloads) > 0 {
380                 firstPayload = infosPayloads[0]
381         }
382         // Pad first payload, to hide actual number of existing files
383         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
384                 firstPayload = append(firstPayload, SPHaltMarshalized...)
385         }
386
387         var buf []byte
388         var payload []byte
389         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
390         if err != nil {
391                 state.dirUnlock()
392                 return err
393         }
394         sds := SDS{"node": nodeId, "nice": int(state.Nice)}
395         state.Ctx.LogD("sp-start", sds, "sending first message")
396         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
397         if err = state.WriteSP(conn, buf, false); err != nil {
398                 state.Ctx.LogE("sp-start", sds, err, "")
399                 state.dirUnlock()
400                 return err
401         }
402         state.Ctx.LogD("sp-start", sds, "waiting for first message")
403         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
404         if buf, err = state.ReadSP(conn); err != nil {
405                 state.Ctx.LogE("sp-start", sds, err, "")
406                 state.dirUnlock()
407                 return err
408         }
409         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
410         if err != nil {
411                 state.Ctx.LogE("sp-start", sds, err, "")
412                 state.dirUnlock()
413                 return err
414         }
415         state.Ctx.LogD("sp-start", sds, "starting workers")
416         err = state.StartWorkers(conn, infosPayloads, payload)
417         if err != nil {
418                 state.Ctx.LogE("sp-start", sds, err, "")
419                 state.dirUnlock()
420         }
421         return err
422 }
423
424 func (state *SPState) StartR(conn ConnDeadlined) error {
425         started := time.Now()
426         conf := noise.Config{
427                 CipherSuite: NoiseCipherSuite,
428                 Pattern:     noise.HandshakeIK,
429                 Initiator:   false,
430                 StaticKeypair: noise.DHKey{
431                         Private: state.Ctx.Self.NoisePrv[:],
432                         Public:  state.Ctx.Self.NoisePub[:],
433                 },
434         }
435         hs, err := noise.NewHandshakeState(conf)
436         if err != nil {
437                 return err
438         }
439         xxOnly := TRxTx("")
440         state.hs = hs
441         state.payloads = make(chan []byte)
442         state.pings = make(chan struct{})
443         state.infosOurSeen = make(map[[32]byte]uint8)
444         state.infosTheir = make(map[[32]byte]*SPInfo)
445         state.started = started
446         state.xxOnly = xxOnly
447         var buf []byte
448         var payload []byte
449         state.Ctx.LogD("sp-start", SDS{"nice": int(state.Nice)}, "waiting for first message")
450         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
451         if buf, err = state.ReadSP(conn); err != nil {
452                 state.Ctx.LogE("sp-start", SDS{}, err, "")
453                 return err
454         }
455         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
456                 state.Ctx.LogE("sp-start", SDS{}, err, "")
457                 return err
458         }
459
460         var node *Node
461         for _, n := range state.Ctx.Neigh {
462                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
463                         node = n
464                         break
465                 }
466         }
467         if node == nil {
468                 peerId := ToBase32(state.hs.PeerStatic())
469                 state.Ctx.LogE("sp-start", SDS{"peer": peerId}, errors.New("unknown"), "")
470                 return errors.New("Unknown peer: " + peerId)
471         }
472         state.Node = node
473         state.rxRate = node.RxRate
474         state.txRate = node.TxRate
475         state.onlineDeadline = node.OnlineDeadline
476         state.maxOnlineTime = node.MaxOnlineTime
477         sds := SDS{"node": node.Id, "nice": int(state.Nice)}
478
479         if state.Ctx.ensureRxDir(node.Id); err != nil {
480                 return err
481         }
482         var rxLock *os.File
483         if xxOnly == "" || xxOnly == TRx {
484                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
485                 if err != nil {
486                         return err
487                 }
488         }
489         state.rxLock = rxLock
490         var txLock *os.File
491         if xxOnly == "" || xxOnly == TTx {
492                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
493                 if err != nil {
494                         return err
495                 }
496         }
497         state.txLock = txLock
498
499         var infosPayloads [][]byte
500         if xxOnly == "" || xxOnly == TTx {
501                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
502         }
503         var firstPayload []byte
504         if len(infosPayloads) > 0 {
505                 firstPayload = infosPayloads[0]
506         }
507         // Pad first payload, to hide actual number of existing files
508         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
509                 firstPayload = append(firstPayload, SPHaltMarshalized...)
510         }
511
512         state.Ctx.LogD("sp-start", sds, "sending first message")
513         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
514         if err != nil {
515                 state.dirUnlock()
516                 return err
517         }
518         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
519         if err = state.WriteSP(conn, buf, false); err != nil {
520                 state.Ctx.LogE("sp-start", sds, err, "")
521                 state.dirUnlock()
522                 return err
523         }
524         state.Ctx.LogD("sp-start", sds, "starting workers")
525         err = state.StartWorkers(conn, infosPayloads, payload)
526         if err != nil {
527                 state.dirUnlock()
528         }
529         return err
530 }
531
532 func (state *SPState) StartWorkers(
533         conn ConnDeadlined,
534         infosPayloads [][]byte,
535         payload []byte,
536 ) error {
537         sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
538         state.isDead = make(chan struct{})
539         if state.maxOnlineTime > 0 {
540                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
541         }
542
543         // Remaining handshake payload sending
544         if len(infosPayloads) > 1 {
545                 state.wg.Add(1)
546                 go func() {
547                         for _, payload := range infosPayloads[1:] {
548                                 state.Ctx.LogD(
549                                         "sp-work",
550                                         SdsAdd(sds, SDS{"size": len(payload)}),
551                                         "queuing remaining payload",
552                                 )
553                                 state.payloads <- payload
554                         }
555                         state.wg.Done()
556                 }()
557         }
558
559         // Processing of first payload and queueing its responses
560         state.Ctx.LogD(
561                 "sp-work",
562                 SdsAdd(sds, SDS{"size": len(payload)}),
563                 "processing first payload",
564         )
565         replies, err := state.ProcessSP(payload)
566         if err != nil {
567                 state.Ctx.LogE("sp-work", sds, err, "")
568                 return err
569         }
570         state.wg.Add(1)
571         go func() {
572                 for _, reply := range replies {
573                         state.Ctx.LogD(
574                                 "sp-work",
575                                 SdsAdd(sds, SDS{"size": len(reply)}),
576                                 "queuing reply",
577                         )
578                         state.payloads <- reply
579                 }
580                 state.wg.Done()
581         }()
582
583         // Periodic jobs
584         state.wg.Add(1)
585         go func() {
586                 deadlineTicker := time.NewTicker(time.Second)
587                 pingTicker := time.NewTicker(PingTimeout)
588                 for {
589                         select {
590                         case <-state.isDead:
591                                 state.wg.Done()
592                                 deadlineTicker.Stop()
593                                 pingTicker.Stop()
594                                 return
595                         case now := <-deadlineTicker.C:
596                                 if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
597                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
598                                         (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
599                                         (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
600                                         state.SetDead()
601                                         conn.Close()
602                                 }
603                         case now := <-pingTicker.C:
604                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
605                                         state.wg.Add(1)
606                                         go func() {
607                                                 state.pings <- struct{}{}
608                                                 state.wg.Done()
609                                         }()
610                                 }
611                         }
612                 }
613         }()
614
615         // Spool checker and INFOs sender of appearing files
616         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
617                 state.wg.Add(1)
618                 go func() {
619                         ticker := time.NewTicker(time.Second)
620                         for {
621                                 select {
622                                 case <-state.isDead:
623                                         state.wg.Done()
624                                         ticker.Stop()
625                                         return
626                                 case <-ticker.C:
627                                         for _, payload := range state.Ctx.infosOur(
628                                                 state.Node.Id,
629                                                 state.Nice,
630                                                 &state.infosOurSeen,
631                                         ) {
632                                                 state.Ctx.LogD(
633                                                         "sp-work",
634                                                         SdsAdd(sds, SDS{"size": len(payload)}),
635                                                         "queuing new info",
636                                                 )
637                                                 state.payloads <- payload
638                                         }
639                                 }
640                         }
641                 }()
642         }
643
644         // Sender
645         state.wg.Add(1)
646         go func() {
647                 defer conn.Close()
648                 defer state.SetDead()
649                 defer state.wg.Done()
650                 for {
651                         if state.NotAlive() {
652                                 return
653                         }
654                         var payload []byte
655                         var ping bool
656                         select {
657                         case <-state.pings:
658                                 state.Ctx.LogD("sp-xmit", sds, "got ping")
659                                 payload = SPPingMarshalized
660                                 ping = true
661                         case payload = <-state.payloads:
662                                 state.Ctx.LogD(
663                                         "sp-xmit",
664                                         SdsAdd(sds, SDS{"size": len(payload)}),
665                                         "got payload",
666                                 )
667                         default:
668                                 state.RLock()
669                                 if len(state.queueTheir) == 0 {
670                                         state.RUnlock()
671                                         time.Sleep(100 * time.Millisecond)
672                                         continue
673                                 }
674                                 freq := state.queueTheir[0].freq
675                                 state.RUnlock()
676                                 if state.txRate > 0 {
677                                         time.Sleep(time.Second / time.Duration(state.txRate))
678                                 }
679                                 sdsp := SdsAdd(sds, SDS{
680                                         "xx":   string(TTx),
681                                         "pkt":  ToBase32(freq.Hash[:]),
682                                         "size": int64(freq.Offset),
683                                 })
684                                 state.Ctx.LogD("sp-file", sdsp, "queueing")
685                                 fd, err := os.Open(filepath.Join(
686                                         state.Ctx.Spool,
687                                         state.Node.Id.String(),
688                                         string(TTx),
689                                         ToBase32(freq.Hash[:]),
690                                 ))
691                                 if err != nil {
692                                         state.Ctx.LogE("sp-file", sdsp, err, "")
693                                         return
694                                 }
695                                 fi, err := fd.Stat()
696                                 if err != nil {
697                                         state.Ctx.LogE("sp-file", sdsp, err, "")
698                                         return
699                                 }
700                                 fullSize := fi.Size()
701                                 var buf []byte
702                                 if freq.Offset < uint64(fullSize) {
703                                         state.Ctx.LogD("sp-file", sdsp, "seeking")
704                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
705                                                 state.Ctx.LogE("sp-file", sdsp, err, "")
706                                                 return
707                                         }
708                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
709                                         n, err := fd.Read(buf)
710                                         if err != nil {
711                                                 state.Ctx.LogE("sp-file", sdsp, err, "")
712                                                 return
713                                         }
714                                         buf = buf[:n]
715                                         state.Ctx.LogD("sp-file", SdsAdd(sdsp, SDS{"size": n}), "read")
716                                 }
717                                 fd.Close()
718                                 payload = MarshalSP(SPTypeFile, SPFile{
719                                         Hash:    freq.Hash,
720                                         Offset:  freq.Offset,
721                                         Payload: buf,
722                                 })
723                                 ourSize := freq.Offset + uint64(len(buf))
724                                 sdsp["size"] = int64(ourSize)
725                                 sdsp["fullsize"] = fullSize
726                                 if state.Ctx.ShowPrgrs {
727                                         Progress("Tx", sdsp)
728                                 }
729                                 state.Lock()
730                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
731                                         if ourSize == uint64(fullSize) {
732                                                 state.Ctx.LogD("sp-file", sdsp, "finished")
733                                                 if len(state.queueTheir) > 1 {
734                                                         state.queueTheir = state.queueTheir[1:]
735                                                 } else {
736                                                         state.queueTheir = state.queueTheir[:0]
737                                                 }
738                                         } else {
739                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
740                                         }
741                                 } else {
742                                         state.Ctx.LogD("sp-file", sdsp, "queue disappeared")
743                                 }
744                                 state.Unlock()
745                         }
746                         state.Ctx.LogD("sp-xmit", SdsAdd(sds, SDS{"size": len(payload)}), "sending")
747                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
748                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
749                                 state.Ctx.LogE("sp-xmit", sds, err, "")
750                                 return
751                         }
752                 }
753         }()
754
755         // Receiver
756         state.wg.Add(1)
757         go func() {
758                 for {
759                         if state.NotAlive() {
760                                 break
761                         }
762                         state.Ctx.LogD("sp-recv", sds, "waiting for payload")
763                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
764                         payload, err := state.ReadSP(conn)
765                         if err != nil {
766                                 if err == io.EOF {
767                                         break
768                                 }
769                                 unmarshalErr := err.(*xdr.UnmarshalError)
770                                 netErr, ok := unmarshalErr.Err.(net.Error)
771                                 if ok && netErr.Timeout() {
772                                         continue
773                                 }
774                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
775                                         break
776                                 }
777                                 state.Ctx.LogE("sp-recv", sds, err, "")
778                                 break
779                         }
780                         state.Ctx.LogD(
781                                 "sp-recv",
782                                 SdsAdd(sds, SDS{"size": len(payload)}),
783                                 "got payload",
784                         )
785                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
786                         if err != nil {
787                                 state.Ctx.LogE("sp-recv", sds, err, "")
788                                 break
789                         }
790                         state.Ctx.LogD(
791                                 "sp-recv",
792                                 SdsAdd(sds, SDS{"size": len(payload)}),
793                                 "processing",
794                         )
795                         replies, err := state.ProcessSP(payload)
796                         if err != nil {
797                                 state.Ctx.LogE("sp-recv", sds, err, "")
798                                 break
799                         }
800                         state.wg.Add(1)
801                         go func() {
802                                 for _, reply := range replies {
803                                         state.Ctx.LogD(
804                                                 "sp-recv",
805                                                 SdsAdd(sds, SDS{"size": len(reply)}),
806                                                 "queuing reply",
807                                         )
808                                         state.payloads <- reply
809                                 }
810                                 state.wg.Done()
811                         }()
812                         if state.rxRate > 0 {
813                                 time.Sleep(time.Second / time.Duration(state.rxRate))
814                         }
815                 }
816                 state.SetDead()
817                 state.wg.Done()
818                 state.SetDead()
819                 conn.Close()
820         }()
821
822         return nil
823 }
824
825 func (state *SPState) Wait() {
826         state.wg.Wait()
827         close(state.payloads)
828         close(state.pings)
829         state.dirUnlock()
830         state.Duration = time.Now().Sub(state.started)
831         state.RxSpeed = state.RxBytes
832         state.TxSpeed = state.TxBytes
833         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
834         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
835         if rxDuration > 0 {
836                 state.RxSpeed = state.RxBytes / rxDuration
837         }
838         if txDuration > 0 {
839                 state.TxSpeed = state.TxBytes / txDuration
840         }
841 }
842
843 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
844         sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
845         r := bytes.NewReader(payload)
846         var err error
847         var replies [][]byte
848         var infosGot bool
849         for r.Len() > 0 {
850                 state.Ctx.LogD("sp-process", sds, "unmarshaling header")
851                 var head SPHead
852                 if _, err = xdr.Unmarshal(r, &head); err != nil {
853                         state.Ctx.LogE("sp-process", sds, err, "")
854                         return nil, err
855                 }
856                 switch head.Type {
857                 case SPTypeHalt:
858                         state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "")
859                         state.Lock()
860                         state.queueTheir = nil
861                         state.Unlock()
862                 case SPTypePing:
863                         state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "ping"}), "")
864                 case SPTypeInfo:
865                         infosGot = true
866                         sdsp := SdsAdd(sds, SDS{"type": "info"})
867                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
868                         var info SPInfo
869                         if _, err = xdr.Unmarshal(r, &info); err != nil {
870                                 state.Ctx.LogE("sp-process", sdsp, err, "")
871                                 return nil, err
872                         }
873                         sdsp = SdsAdd(sds, SDS{
874                                 "pkt":  ToBase32(info.Hash[:]),
875                                 "size": int64(info.Size),
876                                 "nice": int(info.Nice),
877                         })
878                         if !state.listOnly && info.Nice > state.Nice {
879                                 state.Ctx.LogD("sp-process", sdsp, "too nice")
880                                 continue
881                         }
882                         state.Ctx.LogD("sp-process", sdsp, "received")
883                         if !state.listOnly && state.xxOnly == TTx {
884                                 continue
885                         }
886                         state.Lock()
887                         state.infosTheir[*info.Hash] = &info
888                         state.Unlock()
889                         state.Ctx.LogD("sp-process", sdsp, "stating part")
890                         pktPath := filepath.Join(
891                                 state.Ctx.Spool,
892                                 state.Node.Id.String(),
893                                 string(TRx),
894                                 ToBase32(info.Hash[:]),
895                         )
896                         if _, err = os.Stat(pktPath); err == nil {
897                                 state.Ctx.LogI("sp-info", sdsp, "already done")
898                                 if !state.listOnly {
899                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
900                                 }
901                                 continue
902                         }
903                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
904                                 state.Ctx.LogI("sp-info", sdsp, "already seen")
905                                 if !state.listOnly {
906                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
907                                 }
908                                 continue
909                         }
910                         fi, err := os.Stat(pktPath + PartSuffix)
911                         var offset int64
912                         if err == nil {
913                                 offset = fi.Size()
914                         }
915                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
916                                 state.Ctx.LogI("sp-info", sdsp, "not enough space")
917                                 continue
918                         }
919                         state.Ctx.LogI(
920                                 "sp-info",
921                                 SdsAdd(sdsp, SDS{"offset": offset}),
922                                 "",
923                         )
924                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
925                                 replies = append(replies, MarshalSP(
926                                         SPTypeFreq,
927                                         SPFreq{info.Hash, uint64(offset)},
928                                 ))
929                         }
930                 case SPTypeFile:
931                         sdsp := SdsAdd(sds, SDS{"type": "file"})
932                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
933                         var file SPFile
934                         if _, err = xdr.Unmarshal(r, &file); err != nil {
935                                 state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "file"}), err, "")
936                                 return nil, err
937                         }
938                         sdsp["xx"] = string(TRx)
939                         sdsp["pkt"] = ToBase32(file.Hash[:])
940                         sdsp["size"] = len(file.Payload)
941                         dirToSync := filepath.Join(
942                                 state.Ctx.Spool,
943                                 state.Node.Id.String(),
944                                 string(TRx),
945                         )
946                         filePath := filepath.Join(dirToSync, ToBase32(file.Hash[:]))
947                         state.Ctx.LogD("sp-file", sdsp, "opening part")
948                         fd, err := os.OpenFile(
949                                 filePath+PartSuffix,
950                                 os.O_RDWR|os.O_CREATE,
951                                 os.FileMode(0666),
952                         )
953                         if err != nil {
954                                 state.Ctx.LogE("sp-file", sdsp, err, "")
955                                 return nil, err
956                         }
957                         state.Ctx.LogD(
958                                 "sp-file",
959                                 SdsAdd(sdsp, SDS{"offset": file.Offset}),
960                                 "seeking",
961                         )
962                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
963                                 state.Ctx.LogE("sp-file", sdsp, err, "")
964                                 fd.Close()
965                                 return nil, err
966                         }
967                         state.Ctx.LogD("sp-file", sdsp, "writing")
968                         _, err = fd.Write(file.Payload)
969                         if err != nil {
970                                 state.Ctx.LogE("sp-file", sdsp, err, "")
971                                 fd.Close()
972                                 return nil, err
973                         }
974                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
975                         sdsp["size"] = ourSize
976                         fullsize := int64(0)
977                         state.RLock()
978                         infoTheir, ok := state.infosTheir[*file.Hash]
979                         state.RUnlock()
980                         if ok {
981                                 fullsize = int64(infoTheir.Size)
982                         }
983                         sdsp["fullsize"] = fullsize
984                         if state.Ctx.ShowPrgrs {
985                                 Progress("Rx", sdsp)
986                         }
987                         if fullsize != ourSize {
988                                 fd.Close()
989                                 continue
990                         }
991                         spWorkersGroup.Wait()
992                         spWorkersGroup.Add(1)
993                         go func() {
994                                 if err := fd.Sync(); err != nil {
995                                         state.Ctx.LogE("sp-file", sdsp, err, "sync")
996                                         fd.Close()
997                                         return
998                                 }
999                                 state.wg.Add(1)
1000                                 defer state.wg.Done()
1001                                 fd.Seek(0, io.SeekStart)
1002                                 state.Ctx.LogD("sp-file", sdsp, "checking")
1003                                 gut, err := Check(fd, file.Hash[:], sdsp, state.Ctx.ShowPrgrs)
1004                                 fd.Close()
1005                                 if err != nil || !gut {
1006                                         state.Ctx.LogE("sp-file", sdsp, errors.New("checksum mismatch"), "")
1007                                         return
1008                                 }
1009                                 state.Ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
1010                                 if err = os.Rename(filePath+PartSuffix, filePath); err != nil {
1011                                         state.Ctx.LogE("sp-file", sdsp, err, "rename")
1012                                         return
1013                                 }
1014                                 if err = DirSync(dirToSync); err != nil {
1015                                         state.Ctx.LogE("sp-file", sdsp, err, "sync")
1016                                         return
1017                                 }
1018                                 state.Lock()
1019                                 delete(state.infosTheir, *file.Hash)
1020                                 state.Unlock()
1021                                 spWorkersGroup.Done()
1022                                 state.wg.Add(1)
1023                                 go func() {
1024                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1025                                         state.wg.Done()
1026                                 }()
1027                         }()
1028                 case SPTypeDone:
1029                         sdsp := SdsAdd(sds, SDS{"type": "done"})
1030                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
1031                         var done SPDone
1032                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1033                                 state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "done"}), err, "")
1034                                 return nil, err
1035                         }
1036                         sdsp["pkt"] = ToBase32(done.Hash[:])
1037                         state.Ctx.LogD("sp-done", sdsp, "removing")
1038                         err := os.Remove(filepath.Join(
1039                                 state.Ctx.Spool,
1040                                 state.Node.Id.String(),
1041                                 string(TTx),
1042                                 ToBase32(done.Hash[:]),
1043                         ))
1044                         sdsp["xx"] = string(TTx)
1045                         if err == nil {
1046                                 state.Ctx.LogI("sp-done", sdsp, "")
1047                         } else {
1048                                 state.Ctx.LogE("sp-done", sdsp, err, "")
1049                         }
1050                 case SPTypeFreq:
1051                         sdsp := SdsAdd(sds, SDS{"type": "freq"})
1052                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
1053                         var freq SPFreq
1054                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1055                                 state.Ctx.LogE("sp-process", sdsp, err, "")
1056                                 return nil, err
1057                         }
1058                         sdsp["pkt"] = ToBase32(freq.Hash[:])
1059                         sdsp["offset"] = freq.Offset
1060                         state.Ctx.LogD("sp-process", sdsp, "queueing")
1061                         nice, exists := state.infosOurSeen[*freq.Hash]
1062                         if exists {
1063                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1064                                         state.Lock()
1065                                         insertIdx := 0
1066                                         var freqWithNice *FreqWithNice
1067                                         for insertIdx, freqWithNice = range state.queueTheir {
1068                                                 if freqWithNice.nice > nice {
1069                                                         break
1070                                                 }
1071                                         }
1072                                         state.queueTheir = append(state.queueTheir, nil)
1073                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1074                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1075                                         state.Unlock()
1076                                 } else {
1077                                         state.Ctx.LogD("sp-process", sdsp, "skipping")
1078                                 }
1079                         } else {
1080                                 state.Ctx.LogD("sp-process", sdsp, "unknown")
1081                         }
1082                 default:
1083                         state.Ctx.LogE(
1084                                 "sp-process",
1085                                 SdsAdd(sds, SDS{"type": head.Type}),
1086                                 errors.New("unknown type"),
1087                                 "",
1088                         )
1089                         return nil, BadPktType
1090                 }
1091                 if head.Type != SPTypePing {
1092                         state.RxLastNonPing = state.RxLastSeen
1093                 }
1094         }
1095         if infosGot {
1096                 var pkts int
1097                 var size uint64
1098                 state.RLock()
1099                 for _, info := range state.infosTheir {
1100                         pkts++
1101                         size += info.Size
1102                 }
1103                 state.RUnlock()
1104                 state.Ctx.LogI("sp-infos", SDS{
1105                         "xx":   string(TRx),
1106                         "node": state.Node.Id,
1107                         "pkts": pkts,
1108                         "size": int64(size),
1109                 }, "")
1110         }
1111         return payloadsSplit(replies), nil
1112 }