]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/sp.go
nncp-call -pkts option
[nncp.git] / src / cypherpunks.ru / nncp / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bytes"
23         "crypto/subtle"
24         "errors"
25         "io"
26         "net"
27         "os"
28         "path/filepath"
29         "sort"
30         "strconv"
31         "sync"
32         "time"
33
34         "github.com/davecgh/go-xdr/xdr2"
35         "github.com/flynn/noise"
36 )
37
38 const (
39         MaxSPSize       = 1<<16 - 256
40         PartSuffix      = ".part"
41         DefaultDeadline = 10
42 )
43
44 var (
45         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
46
47         SPHeadOverhead    int
48         SPInfoOverhead    int
49         SPFreqOverhead    int
50         SPFileOverhead    int
51         SPHaltMarshalized []byte
52
53         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
54                 noise.DH25519,
55                 noise.CipherChaChaPoly,
56                 noise.HashBLAKE2b,
57         )
58
59         spWorkersGroup sync.WaitGroup
60 )
61
62 type SPType uint8
63
64 const (
65         SPTypeInfo SPType = iota
66         SPTypeFreq SPType = iota
67         SPTypeFile SPType = iota
68         SPTypeDone SPType = iota
69         SPTypeHalt SPType = iota
70 )
71
72 type SPHead struct {
73         Type SPType
74 }
75
76 type SPInfo struct {
77         Nice uint8
78         Size uint64
79         Hash *[32]byte
80 }
81
82 type SPFreq struct {
83         Hash   *[32]byte
84         Offset uint64
85 }
86
87 type SPFile struct {
88         Hash    *[32]byte
89         Offset  uint64
90         Payload []byte
91 }
92
93 type SPDone struct {
94         Hash *[32]byte
95 }
96
97 type SPRaw struct {
98         Magic   [8]byte
99         Payload []byte
100 }
101
102 type FreqWithNice struct {
103         freq *SPFreq
104         nice uint8
105 }
106
107 type ConnDeadlined interface {
108         io.ReadWriter
109         SetReadDeadline(t time.Time) error
110         SetWriteDeadline(t time.Time) error
111 }
112
113 func init() {
114         var buf bytes.Buffer
115         spHead := SPHead{Type: SPTypeHalt}
116         if _, err := xdr.Marshal(&buf, spHead); err != nil {
117                 panic(err)
118         }
119         copy(SPHaltMarshalized, buf.Bytes())
120         SPHeadOverhead = buf.Len()
121         buf.Reset()
122
123         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
124         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
125                 panic(err)
126         }
127         SPInfoOverhead = buf.Len()
128         buf.Reset()
129
130         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
131         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
132                 panic(err)
133         }
134         SPFreqOverhead = buf.Len()
135         buf.Reset()
136
137         spFile := SPFile{Hash: new([32]byte), Offset: 123}
138         if _, err := xdr.Marshal(&buf, spFile); err != nil {
139                 panic(err)
140         }
141         SPFileOverhead = buf.Len()
142 }
143
144 func MarshalSP(typ SPType, sp interface{}) []byte {
145         var buf bytes.Buffer
146         var err error
147         if _, err = xdr.Marshal(&buf, SPHead{typ}); err != nil {
148                 panic(err)
149         }
150         if _, err = xdr.Marshal(&buf, sp); err != nil {
151                 panic(err)
152         }
153         return buf.Bytes()
154 }
155
156 func payloadsSplit(payloads [][]byte) [][]byte {
157         var outbounds [][]byte
158         outbound := make([]byte, 0, MaxSPSize)
159         for i, payload := range payloads {
160                 outbound = append(outbound, payload...)
161                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
162                         outbounds = append(outbounds, outbound)
163                         outbound = make([]byte, 0, MaxSPSize)
164                 }
165         }
166         if len(outbound) > 0 {
167                 outbounds = append(outbounds, outbound)
168         }
169         return outbounds
170 }
171
172 type SPState struct {
173         ctx            *Ctx
174         Node           *Node
175         onlineDeadline uint
176         maxOnlineTime  uint
177         nice           uint8
178         hs             *noise.HandshakeState
179         csOur          *noise.CipherState
180         csTheir        *noise.CipherState
181         payloads       chan []byte
182         infosTheir     map[[32]byte]*SPInfo
183         infosOurSeen   map[[32]byte]uint8
184         queueTheir     []*FreqWithNice
185         wg             sync.WaitGroup
186         RxBytes        int64
187         RxLastSeen     time.Time
188         TxBytes        int64
189         TxLastSeen     time.Time
190         started        time.Time
191         Duration       time.Duration
192         RxSpeed        int64
193         TxSpeed        int64
194         rxLock         *os.File
195         txLock         *os.File
196         xxOnly         TRxTx
197         rxRate         int
198         txRate         int
199         isDead         bool
200         listOnly       bool
201         onlyPkts       map[[32]byte]bool
202         sync.RWMutex
203 }
204
205 func (state *SPState) NotAlive() bool {
206         if state.isDead {
207                 return true
208         }
209         now := time.Now()
210         if state.maxOnlineTime > 0 && state.started.Add(time.Duration(state.maxOnlineTime)*time.Second).Before(now) {
211                 return true
212         }
213         return uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline &&
214                 uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline
215 }
216
217 func (state *SPState) dirUnlock() {
218         state.ctx.UnlockDir(state.rxLock)
219         state.ctx.UnlockDir(state.txLock)
220 }
221
222 func (state *SPState) WriteSP(dst io.Writer, payload []byte) error {
223         n, err := xdr.Marshal(dst, SPRaw{Magic: MagicNNCPLv1, Payload: payload})
224         if err == nil {
225                 state.TxLastSeen = time.Now()
226                 state.TxBytes += int64(n)
227         }
228         return err
229 }
230
231 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
232         var sp SPRaw
233         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
234         if err != nil {
235                 return nil, err
236         }
237         state.RxLastSeen = time.Now()
238         state.RxBytes += int64(n)
239         if sp.Magic != MagicNNCPLv1 {
240                 return nil, BadMagic
241         }
242         return sp.Payload, nil
243 }
244
245 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
246         var infos []*SPInfo
247         var totalSize int64
248         for job := range ctx.Jobs(nodeId, TTx) {
249                 job.Fd.Close()
250                 if job.PktEnc.Nice > nice {
251                         continue
252                 }
253                 if _, known := (*seen)[*job.HshValue]; known {
254                         continue
255                 }
256                 totalSize += job.Size
257                 infos = append(infos, &SPInfo{
258                         Nice: job.PktEnc.Nice,
259                         Size: uint64(job.Size),
260                         Hash: job.HshValue,
261                 })
262                 (*seen)[*job.HshValue] = job.PktEnc.Nice
263         }
264         sort.Sort(ByNice(infos))
265         var payloads [][]byte
266         for _, info := range infos {
267                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
268                 ctx.LogD("sp-info-our", SDS{
269                         "node": nodeId,
270                         "name": ToBase32(info.Hash[:]),
271                         "size": strconv.FormatInt(int64(info.Size), 10),
272                 }, "")
273         }
274         if totalSize > 0 {
275                 ctx.LogI("sp-infos", SDS{
276                         "xx":   string(TTx),
277                         "node": nodeId,
278                         "pkts": strconv.Itoa(len(payloads)),
279                         "size": strconv.FormatInt(totalSize, 10),
280                 }, "")
281         }
282         return payloadsSplit(payloads)
283 }
284
285 func (ctx *Ctx) StartI(
286         conn ConnDeadlined,
287         nodeId *NodeId,
288         nice uint8,
289         xxOnly TRxTx,
290         rxRate, txRate int,
291         onlineDeadline, maxOnlineTime uint,
292         listOnly bool,
293         onlyPkts map[[32]byte]bool,
294 ) (*SPState, error) {
295         err := ctx.ensureRxDir(nodeId)
296         if err != nil {
297                 return nil, err
298         }
299         var rxLock *os.File
300         if !listOnly && (xxOnly == "" || xxOnly == TRx) {
301                 rxLock, err = ctx.LockDir(nodeId, TRx)
302                 if err != nil {
303                         return nil, err
304                 }
305         }
306         var txLock *os.File
307         if !listOnly && (xxOnly == "" || xxOnly == TTx) {
308                 txLock, err = ctx.LockDir(nodeId, TTx)
309                 if err != nil {
310                         return nil, err
311                 }
312         }
313         started := time.Now()
314         node := ctx.Neigh[*nodeId]
315         conf := noise.Config{
316                 CipherSuite: NoiseCipherSuite,
317                 Pattern:     noise.HandshakeIK,
318                 Initiator:   true,
319                 StaticKeypair: noise.DHKey{
320                         Private: ctx.Self.NoisePrv[:],
321                         Public:  ctx.Self.NoisePub[:],
322                 },
323                 PeerStatic: node.NoisePub[:],
324         }
325         hs, err := noise.NewHandshakeState(conf)
326         if err != nil {
327                 return nil, err
328         }
329         state := SPState{
330                 ctx:            ctx,
331                 hs:             hs,
332                 Node:           node,
333                 onlineDeadline: onlineDeadline,
334                 maxOnlineTime:  maxOnlineTime,
335                 nice:           nice,
336                 payloads:       make(chan []byte),
337                 infosTheir:     make(map[[32]byte]*SPInfo),
338                 infosOurSeen:   make(map[[32]byte]uint8),
339                 started:        started,
340                 rxLock:         rxLock,
341                 txLock:         txLock,
342                 xxOnly:         xxOnly,
343                 rxRate:         rxRate,
344                 txRate:         txRate,
345                 listOnly:       listOnly,
346                 onlyPkts:       onlyPkts,
347         }
348
349         var infosPayloads [][]byte
350         if !listOnly && (xxOnly == "" || xxOnly == TTx) {
351                 infosPayloads = ctx.infosOur(nodeId, nice, &state.infosOurSeen)
352         }
353         var firstPayload []byte
354         if len(infosPayloads) > 0 {
355                 firstPayload = infosPayloads[0]
356         }
357         // Pad first payload, to hide actual number of existing files
358         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
359                 firstPayload = append(firstPayload, SPHaltMarshalized...)
360         }
361
362         var buf []byte
363         var payload []byte
364         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
365         if err != nil {
366                 state.dirUnlock()
367                 return nil, err
368         }
369         sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
370         ctx.LogD("sp-start", sds, "sending first message")
371         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
372         if err = state.WriteSP(conn, buf); err != nil {
373                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
374                 state.dirUnlock()
375                 return nil, err
376         }
377         ctx.LogD("sp-start", sds, "waiting for first message")
378         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
379         if buf, err = state.ReadSP(conn); err != nil {
380                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
381                 state.dirUnlock()
382                 return nil, err
383         }
384         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
385         if err != nil {
386                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
387                 state.dirUnlock()
388                 return nil, err
389         }
390         ctx.LogD("sp-start", sds, "starting workers")
391         err = state.StartWorkers(conn, infosPayloads, payload)
392         if err != nil {
393                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
394                 state.dirUnlock()
395                 return nil, err
396         }
397         return &state, err
398 }
399
400 func (ctx *Ctx) StartR(conn ConnDeadlined, nice uint8, xxOnly TRxTx) (*SPState, error) {
401         started := time.Now()
402         conf := noise.Config{
403                 CipherSuite: NoiseCipherSuite,
404                 Pattern:     noise.HandshakeIK,
405                 Initiator:   false,
406                 StaticKeypair: noise.DHKey{
407                         Private: ctx.Self.NoisePrv[:],
408                         Public:  ctx.Self.NoisePub[:],
409                 },
410         }
411         hs, err := noise.NewHandshakeState(conf)
412         if err != nil {
413                 return nil, err
414         }
415         state := SPState{
416                 ctx:          ctx,
417                 hs:           hs,
418                 nice:         nice,
419                 payloads:     make(chan []byte),
420                 infosOurSeen: make(map[[32]byte]uint8),
421                 infosTheir:   make(map[[32]byte]*SPInfo),
422                 started:      started,
423                 xxOnly:       xxOnly,
424         }
425         var buf []byte
426         var payload []byte
427         ctx.LogD(
428                 "sp-start",
429                 SDS{"nice": strconv.Itoa(int(nice))},
430                 "waiting for first message",
431         )
432         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
433         if buf, err = state.ReadSP(conn); err != nil {
434                 ctx.LogE("sp-start", SDS{"err": err}, "")
435                 return nil, err
436         }
437         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
438                 ctx.LogE("sp-start", SDS{"err": err}, "")
439                 return nil, err
440         }
441
442         var node *Node
443         for _, node = range ctx.Neigh {
444                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), node.NoisePub[:]) == 1 {
445                         break
446                 }
447         }
448         if node == nil {
449                 peerId := ToBase32(state.hs.PeerStatic())
450                 ctx.LogE("sp-start", SDS{"peer": peerId}, "unknown")
451                 return nil, errors.New("Unknown peer: " + peerId)
452         }
453         state.Node = node
454         state.rxRate = node.RxRate
455         state.txRate = node.TxRate
456         state.onlineDeadline = node.OnlineDeadline
457         state.maxOnlineTime = node.MaxOnlineTime
458         sds := SDS{"node": node.Id, "nice": strconv.Itoa(int(nice))}
459
460         if ctx.ensureRxDir(node.Id); err != nil {
461                 return nil, err
462         }
463         var rxLock *os.File
464         if xxOnly == "" || xxOnly == TRx {
465                 rxLock, err = ctx.LockDir(node.Id, TRx)
466                 if err != nil {
467                         return nil, err
468                 }
469         }
470         state.rxLock = rxLock
471         var txLock *os.File
472         if xxOnly == "" || xxOnly == TTx {
473                 txLock, err = ctx.LockDir(node.Id, TTx)
474                 if err != nil {
475                         return nil, err
476                 }
477         }
478         state.txLock = txLock
479
480         var infosPayloads [][]byte
481         if xxOnly == "" || xxOnly == TTx {
482                 infosPayloads = ctx.infosOur(node.Id, nice, &state.infosOurSeen)
483         }
484         var firstPayload []byte
485         if len(infosPayloads) > 0 {
486                 firstPayload = infosPayloads[0]
487         }
488         // Pad first payload, to hide actual number of existing files
489         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
490                 firstPayload = append(firstPayload, SPHaltMarshalized...)
491         }
492
493         ctx.LogD("sp-start", sds, "sending first message")
494         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
495         if err != nil {
496                 state.dirUnlock()
497                 return nil, err
498         }
499         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
500         if err = state.WriteSP(conn, buf); err != nil {
501                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
502                 state.dirUnlock()
503                 return nil, err
504         }
505         ctx.LogD("sp-start", sds, "starting workers")
506         err = state.StartWorkers(conn, infosPayloads, payload)
507         if err != nil {
508                 state.dirUnlock()
509                 return nil, err
510         }
511         return &state, err
512 }
513
514 func (state *SPState) StartWorkers(
515         conn ConnDeadlined,
516         infosPayloads [][]byte,
517         payload []byte) error {
518         sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.nice))}
519         if len(infosPayloads) > 1 {
520                 go func() {
521                         for _, payload := range infosPayloads[1:] {
522                                 state.ctx.LogD(
523                                         "sp-work",
524                                         SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
525                                         "queuing remaining payload",
526                                 )
527                                 state.payloads <- payload
528                         }
529                 }()
530         }
531         state.ctx.LogD(
532                 "sp-work",
533                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
534                 "processing first payload",
535         )
536         replies, err := state.ProcessSP(payload)
537         if err != nil {
538                 state.ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "")
539                 return err
540         }
541
542         go func() {
543                 for _, reply := range replies {
544                         state.ctx.LogD(
545                                 "sp-work",
546                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
547                                 "queuing reply",
548                         )
549                         state.payloads <- reply
550                 }
551         }()
552
553         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
554                 go func() {
555                         for range time.Tick(time.Second) {
556                                 if state.NotAlive() {
557                                         return
558                                 }
559                                 for _, payload := range state.ctx.infosOur(
560                                         state.Node.Id,
561                                         state.nice,
562                                         &state.infosOurSeen,
563                                 ) {
564                                         state.ctx.LogD(
565                                                 "sp-work",
566                                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
567                                                 "queuing new info",
568                                         )
569                                         state.payloads <- payload
570                                 }
571                         }
572                 }()
573         }
574
575         state.wg.Add(1)
576         go func() {
577                 defer func() {
578                         state.isDead = true
579                         state.wg.Done()
580                 }()
581                 for {
582                         if state.NotAlive() {
583                                 return
584                         }
585                         var payload []byte
586                         select {
587                         case payload = <-state.payloads:
588                                 state.ctx.LogD(
589                                         "sp-xmit",
590                                         SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
591                                         "got payload",
592                                 )
593                         default:
594                         }
595                         if payload == nil {
596                                 state.RLock()
597                                 if len(state.queueTheir) == 0 {
598                                         state.ctx.LogD("sp-xmit", sds, "file queue is empty")
599                                         state.RUnlock()
600                                         time.Sleep(100 * time.Millisecond)
601                                         continue
602                                 }
603                                 freq := state.queueTheir[0].freq
604                                 state.RUnlock()
605
606                                 if state.txRate > 0 {
607                                         time.Sleep(time.Second / time.Duration(state.txRate))
608                                 }
609
610                                 sdsp := SdsAdd(sds, SDS{
611                                         "xx":   string(TTx),
612                                         "hash": ToBase32(freq.Hash[:]),
613                                         "size": strconv.FormatInt(int64(freq.Offset), 10),
614                                 })
615                                 state.ctx.LogD("sp-file", sdsp, "queueing")
616                                 fd, err := os.Open(filepath.Join(
617                                         state.ctx.Spool,
618                                         state.Node.Id.String(),
619                                         string(TTx),
620                                         ToBase32(freq.Hash[:]),
621                                 ))
622                                 if err != nil {
623                                         state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
624                                         break
625                                 }
626                                 fi, err := fd.Stat()
627                                 if err != nil {
628                                         state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
629                                         break
630                                 }
631                                 fullSize := uint64(fi.Size())
632                                 var buf []byte
633                                 if freq.Offset < fullSize {
634                                         state.ctx.LogD("sp-file", sdsp, "seeking")
635                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
636                                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
637                                                 break
638                                         }
639                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
640                                         n, err := fd.Read(buf)
641                                         if err != nil {
642                                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
643                                                 break
644                                         }
645                                         buf = buf[:n]
646                                         state.ctx.LogD(
647                                                 "sp-file",
648                                                 SdsAdd(sdsp, SDS{"size": strconv.Itoa(n)}),
649                                                 "read",
650                                         )
651                                 }
652                                 fd.Close()
653                                 payload = MarshalSP(SPTypeFile, SPFile{
654                                         Hash:    freq.Hash,
655                                         Offset:  freq.Offset,
656                                         Payload: buf,
657                                 })
658                                 ourSize := freq.Offset + uint64(len(buf))
659                                 sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
660                                 sdsp["fullsize"] = strconv.FormatInt(int64(fullSize), 10)
661                                 state.ctx.LogP("sp-file", sdsp, "")
662                                 state.Lock()
663                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
664                                         if ourSize == fullSize {
665                                                 state.ctx.LogD("sp-file", sdsp, "finished")
666                                                 if len(state.queueTheir) > 1 {
667                                                         state.queueTheir = state.queueTheir[1:]
668                                                 } else {
669                                                         state.queueTheir = state.queueTheir[:0]
670                                                 }
671                                         } else {
672                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
673                                         }
674                                 } else {
675                                         state.ctx.LogD("sp-file", sdsp, "queue disappeared")
676                                 }
677                                 state.Unlock()
678                         }
679                         state.ctx.LogD(
680                                 "sp-xmit",
681                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
682                                 "sending",
683                         )
684                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
685                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil {
686                                 state.ctx.LogE("sp-xmit", SdsAdd(sds, SDS{"err": err}), "")
687                                 break
688                         }
689                 }
690         }()
691
692         state.wg.Add(1)
693         go func() {
694                 defer func() {
695                         state.isDead = true
696                         state.wg.Done()
697                 }()
698                 for {
699                         if state.NotAlive() {
700                                 return
701                         }
702                         state.ctx.LogD("sp-recv", sds, "waiting for payload")
703                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
704                         payload, err := state.ReadSP(conn)
705                         if err != nil {
706                                 unmarshalErr := err.(*xdr.UnmarshalError)
707                                 netErr, ok := unmarshalErr.Err.(net.Error)
708                                 if ok && netErr.Timeout() {
709                                         continue
710                                 }
711                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
712                                         break
713                                 }
714                                 state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
715                                 break
716                         }
717                         state.ctx.LogD(
718                                 "sp-recv",
719                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
720                                 "got payload",
721                         )
722                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
723                         if err != nil {
724                                 state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
725                                 break
726                         }
727                         state.ctx.LogD(
728                                 "sp-recv",
729                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
730                                 "processing",
731                         )
732                         replies, err := state.ProcessSP(payload)
733                         if err != nil {
734                                 state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
735                                 break
736                         }
737                         go func() {
738                                 for _, reply := range replies {
739                                         state.ctx.LogD(
740                                                 "sp-recv",
741                                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
742                                                 "queuing reply",
743                                         )
744                                         state.payloads <- reply
745                                 }
746                         }()
747                         if state.rxRate > 0 {
748                                 time.Sleep(time.Second / time.Duration(state.rxRate))
749                         }
750                 }
751         }()
752
753         return nil
754 }
755
756 func (state *SPState) Wait() {
757         state.wg.Wait()
758         state.dirUnlock()
759         state.Duration = time.Now().Sub(state.started)
760         state.RxSpeed = state.RxBytes
761         state.TxSpeed = state.TxBytes
762         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
763         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
764         if rxDuration > 0 {
765                 state.RxSpeed = state.RxBytes / rxDuration
766         }
767         if txDuration > 0 {
768                 state.TxSpeed = state.TxBytes / txDuration
769         }
770 }
771
772 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
773         sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.nice))}
774         r := bytes.NewReader(payload)
775         var err error
776         var replies [][]byte
777         var infosGot bool
778         for r.Len() > 0 {
779                 state.ctx.LogD("sp-process", sds, "unmarshaling header")
780                 var head SPHead
781                 if _, err = xdr.Unmarshal(r, &head); err != nil {
782                         state.ctx.LogE("sp-process", SdsAdd(sds, SDS{"err": err}), "")
783                         return nil, err
784                 }
785                 switch head.Type {
786                 case SPTypeInfo:
787                         infosGot = true
788                         sdsp := SdsAdd(sds, SDS{"type": "info"})
789                         state.ctx.LogD("sp-process", sdsp, "unmarshaling packet")
790                         var info SPInfo
791                         if _, err = xdr.Unmarshal(r, &info); err != nil {
792                                 state.ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "")
793                                 return nil, err
794                         }
795                         sdsp = SdsAdd(sds, SDS{
796                                 "hash": ToBase32(info.Hash[:]),
797                                 "size": strconv.FormatInt(int64(info.Size), 10),
798                                 "nice": strconv.Itoa(int(info.Nice)),
799                         })
800                         if !state.listOnly && info.Nice > state.nice {
801                                 state.ctx.LogD("sp-process", sdsp, "too nice")
802                                 continue
803                         }
804                         state.ctx.LogD("sp-process", sdsp, "received")
805                         if !state.listOnly && state.xxOnly == TTx {
806                                 continue
807                         }
808                         state.Lock()
809                         state.infosTheir[*info.Hash] = &info
810                         state.Unlock()
811                         state.ctx.LogD("sp-process", sdsp, "stating part")
812                         pktPath := filepath.Join(
813                                 state.ctx.Spool,
814                                 state.Node.Id.String(),
815                                 string(TRx),
816                                 ToBase32(info.Hash[:]),
817                         )
818                         if _, err = os.Stat(pktPath); err == nil {
819                                 state.ctx.LogI("sp-info", sdsp, "already done")
820                                 if !state.listOnly {
821                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
822                                 }
823                                 continue
824                         }
825                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
826                                 state.ctx.LogI("sp-info", sdsp, "already seen")
827                                 if !state.listOnly {
828                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
829                                 }
830                                 continue
831                         }
832                         fi, err := os.Stat(pktPath + PartSuffix)
833                         var offset int64
834                         if err == nil {
835                                 offset = fi.Size()
836                         }
837                         if !state.ctx.IsEnoughSpace(int64(info.Size) - offset) {
838                                 state.ctx.LogI("sp-info", sdsp, "not enough space")
839                                 continue
840                         }
841                         state.ctx.LogI(
842                                 "sp-info",
843                                 SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(offset, 10)}),
844                                 "",
845                         )
846                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
847                                 replies = append(replies, MarshalSP(
848                                         SPTypeFreq,
849                                         SPFreq{info.Hash, uint64(offset)},
850                                 ))
851                         }
852                 case SPTypeFile:
853                         sdsp := SdsAdd(sds, SDS{"type": "file"})
854                         state.ctx.LogD("sp-process", sdsp, "unmarshaling packet")
855                         var file SPFile
856                         if _, err = xdr.Unmarshal(r, &file); err != nil {
857                                 state.ctx.LogE("sp-process", SdsAdd(sds, SDS{
858                                         "err":  err,
859                                         "type": "file",
860                                 }), "")
861                                 return nil, err
862                         }
863                         sdsp["xx"] = string(TRx)
864                         sdsp["hash"] = ToBase32(file.Hash[:])
865                         sdsp["size"] = strconv.Itoa(len(file.Payload))
866                         filePath := filepath.Join(
867                                 state.ctx.Spool,
868                                 state.Node.Id.String(),
869                                 string(TRx),
870                                 ToBase32(file.Hash[:]),
871                         )
872                         state.ctx.LogD("sp-file", sdsp, "opening part")
873                         fd, err := os.OpenFile(
874                                 filePath+PartSuffix,
875                                 os.O_RDWR|os.O_CREATE,
876                                 os.FileMode(0600),
877                         )
878                         if err != nil {
879                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
880                                 return nil, err
881                         }
882                         state.ctx.LogD(
883                                 "sp-file",
884                                 SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(int64(file.Offset), 10)}),
885                                 "seeking",
886                         )
887                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
888                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
889                                 fd.Close()
890                                 return nil, err
891                         }
892                         state.ctx.LogD("sp-file", sdsp, "writing")
893                         _, err = fd.Write(file.Payload)
894                         if err != nil {
895                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
896                                 fd.Close()
897                                 return nil, err
898                         }
899                         ourSize := uint64(file.Offset) + uint64(len(file.Payload))
900                         state.RLock()
901                         sdsp["fullsize"] = strconv.FormatInt(int64(state.infosTheir[*file.Hash].Size), 10)
902                         sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
903                         state.ctx.LogP("sp-file", sdsp, "")
904                         if state.infosTheir[*file.Hash].Size != ourSize {
905                                 state.RUnlock()
906                                 fd.Close()
907                                 continue
908                         }
909                         state.RUnlock()
910                         spWorkersGroup.Wait()
911                         spWorkersGroup.Add(1)
912                         go func() {
913                                 if err := fd.Sync(); err != nil {
914                                         state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync")
915                                         fd.Close()
916                                         return
917                                 }
918                                 state.wg.Add(1)
919                                 defer state.wg.Done()
920                                 fd.Seek(0, io.SeekStart)
921                                 state.ctx.LogD("sp-file", sdsp, "checking")
922                                 gut, err := Check(fd, file.Hash[:])
923                                 fd.Close()
924                                 if err != nil || !gut {
925                                         state.ctx.LogE("sp-file", sdsp, "checksum mismatch")
926                                         return
927                                 }
928                                 state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
929                                 os.Rename(filePath+PartSuffix, filePath)
930                                 state.Lock()
931                                 delete(state.infosTheir, *file.Hash)
932                                 state.Unlock()
933                                 spWorkersGroup.Done()
934                                 go func() {
935                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
936                                 }()
937                         }()
938                 case SPTypeDone:
939                         sdsp := SdsAdd(sds, SDS{"type": "done"})
940                         state.ctx.LogD("sp-process", sdsp, "unmarshaling packet")
941                         var done SPDone
942                         if _, err = xdr.Unmarshal(r, &done); err != nil {
943                                 state.ctx.LogE("sp-process", SdsAdd(sds, SDS{
944                                         "type": "done",
945                                         "err":  err,
946                                 }), "")
947                                 return nil, err
948                         }
949                         sdsp["hash"] = ToBase32(done.Hash[:])
950                         state.ctx.LogD("sp-done", sdsp, "removing")
951                         err := os.Remove(filepath.Join(
952                                 state.ctx.Spool,
953                                 state.Node.Id.String(),
954                                 string(TTx),
955                                 ToBase32(done.Hash[:]),
956                         ))
957                         sdsp["xx"] = string(TTx)
958                         if err == nil {
959                                 state.ctx.LogI("sp-done", sdsp, "")
960                         } else {
961                                 state.ctx.LogE("sp-done", sdsp, "")
962                         }
963                 case SPTypeFreq:
964                         sdsp := SdsAdd(sds, SDS{"type": "freq"})
965                         state.ctx.LogD("sp-process", sdsp, "unmarshaling packet")
966                         var freq SPFreq
967                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
968                                 state.ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "")
969                                 return nil, err
970                         }
971                         sdsp["hash"] = ToBase32(freq.Hash[:])
972                         sdsp["offset"] = strconv.FormatInt(int64(freq.Offset), 10)
973                         state.ctx.LogD("sp-process", sdsp, "queueing")
974                         nice, exists := state.infosOurSeen[*freq.Hash]
975                         if exists {
976                                 state.Lock()
977                                 insertIdx := 0
978                                 var freqWithNice *FreqWithNice
979                                 for insertIdx, freqWithNice = range state.queueTheir {
980                                         if freqWithNice.nice > nice {
981                                                 break
982                                         }
983                                 }
984                                 state.queueTheir = append(state.queueTheir, nil)
985                                 copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
986                                 state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
987                                 state.Unlock()
988                         } else {
989                                 state.ctx.LogD("sp-process", sdsp, "unknown")
990                         }
991                 case SPTypeHalt:
992                         state.ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "")
993                         state.Lock()
994                         state.queueTheir = nil
995                         state.Unlock()
996                 default:
997                         state.ctx.LogE(
998                                 "sp-process",
999                                 SdsAdd(sds, SDS{"type": head.Type}),
1000                                 "unknown",
1001                         )
1002                         return nil, BadPktType
1003                 }
1004         }
1005         if infosGot {
1006                 var pkts int
1007                 var size uint64
1008                 state.RLock()
1009                 for _, info := range state.infosTheir {
1010                         pkts++
1011                         size += info.Size
1012                 }
1013                 state.RUnlock()
1014                 state.ctx.LogI("sp-infos", SDS{
1015                         "xx":   string(TRx),
1016                         "node": state.Node.Id,
1017                         "pkts": strconv.Itoa(pkts),
1018                         "size": strconv.FormatInt(int64(size), 10),
1019                 }, "")
1020         }
1021         return payloadsSplit(replies), nil
1022 }