]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/sp.go
nncp-call -list option
[nncp.git] / src / cypherpunks.ru / nncp / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bytes"
23         "crypto/subtle"
24         "errors"
25         "io"
26         "net"
27         "os"
28         "path/filepath"
29         "sort"
30         "strconv"
31         "sync"
32         "time"
33
34         "github.com/davecgh/go-xdr/xdr2"
35         "github.com/flynn/noise"
36 )
37
38 const (
39         MaxSPSize       = 1<<16 - 256
40         PartSuffix      = ".part"
41         DefaultDeadline = 10
42 )
43
44 var (
45         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
46
47         SPHeadOverhead    int
48         SPInfoOverhead    int
49         SPFreqOverhead    int
50         SPFileOverhead    int
51         SPHaltMarshalized []byte
52
53         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
54                 noise.DH25519,
55                 noise.CipherChaChaPoly,
56                 noise.HashBLAKE2b,
57         )
58
59         spWorkersGroup sync.WaitGroup
60 )
61
62 type SPType uint8
63
64 const (
65         SPTypeInfo SPType = iota
66         SPTypeFreq SPType = iota
67         SPTypeFile SPType = iota
68         SPTypeDone SPType = iota
69         SPTypeHalt SPType = iota
70 )
71
72 type SPHead struct {
73         Type SPType
74 }
75
76 type SPInfo struct {
77         Nice uint8
78         Size uint64
79         Hash *[32]byte
80 }
81
82 type SPFreq struct {
83         Hash   *[32]byte
84         Offset uint64
85 }
86
87 type SPFile struct {
88         Hash    *[32]byte
89         Offset  uint64
90         Payload []byte
91 }
92
93 type SPDone struct {
94         Hash *[32]byte
95 }
96
97 type SPRaw struct {
98         Magic   [8]byte
99         Payload []byte
100 }
101
102 type FreqWithNice struct {
103         freq *SPFreq
104         nice uint8
105 }
106
107 type ConnDeadlined interface {
108         io.ReadWriter
109         SetReadDeadline(t time.Time) error
110         SetWriteDeadline(t time.Time) error
111 }
112
113 func init() {
114         var buf bytes.Buffer
115         spHead := SPHead{Type: SPTypeHalt}
116         if _, err := xdr.Marshal(&buf, spHead); err != nil {
117                 panic(err)
118         }
119         copy(SPHaltMarshalized, buf.Bytes())
120         SPHeadOverhead = buf.Len()
121         buf.Reset()
122
123         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
124         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
125                 panic(err)
126         }
127         SPInfoOverhead = buf.Len()
128         buf.Reset()
129
130         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
131         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
132                 panic(err)
133         }
134         SPFreqOverhead = buf.Len()
135         buf.Reset()
136
137         spFile := SPFile{Hash: new([32]byte), Offset: 123}
138         if _, err := xdr.Marshal(&buf, spFile); err != nil {
139                 panic(err)
140         }
141         SPFileOverhead = buf.Len()
142 }
143
144 func MarshalSP(typ SPType, sp interface{}) []byte {
145         var buf bytes.Buffer
146         var err error
147         if _, err = xdr.Marshal(&buf, SPHead{typ}); err != nil {
148                 panic(err)
149         }
150         if _, err = xdr.Marshal(&buf, sp); err != nil {
151                 panic(err)
152         }
153         return buf.Bytes()
154 }
155
156 func payloadsSplit(payloads [][]byte) [][]byte {
157         var outbounds [][]byte
158         outbound := make([]byte, 0, MaxSPSize)
159         for i, payload := range payloads {
160                 outbound = append(outbound, payload...)
161                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
162                         outbounds = append(outbounds, outbound)
163                         outbound = make([]byte, 0, MaxSPSize)
164                 }
165         }
166         if len(outbound) > 0 {
167                 outbounds = append(outbounds, outbound)
168         }
169         return outbounds
170 }
171
172 type SPState struct {
173         ctx            *Ctx
174         Node           *Node
175         onlineDeadline uint
176         maxOnlineTime  uint
177         nice           uint8
178         hs             *noise.HandshakeState
179         csOur          *noise.CipherState
180         csTheir        *noise.CipherState
181         payloads       chan []byte
182         infosTheir     map[[32]byte]*SPInfo
183         infosOurSeen   map[[32]byte]uint8
184         queueTheir     []*FreqWithNice
185         wg             sync.WaitGroup
186         RxBytes        int64
187         RxLastSeen     time.Time
188         TxBytes        int64
189         TxLastSeen     time.Time
190         started        time.Time
191         Duration       time.Duration
192         RxSpeed        int64
193         TxSpeed        int64
194         rxLock         *os.File
195         txLock         *os.File
196         xxOnly         TRxTx
197         rxRate         int
198         txRate         int
199         isDead         bool
200         listOnly       bool
201         sync.RWMutex
202 }
203
204 func (state *SPState) NotAlive() bool {
205         if state.isDead {
206                 return true
207         }
208         now := time.Now()
209         if state.maxOnlineTime > 0 && state.started.Add(time.Duration(state.maxOnlineTime)*time.Second).Before(now) {
210                 return true
211         }
212         return uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline &&
213                 uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline
214 }
215
216 func (state *SPState) dirUnlock() {
217         state.ctx.UnlockDir(state.rxLock)
218         state.ctx.UnlockDir(state.txLock)
219 }
220
221 func (state *SPState) WriteSP(dst io.Writer, payload []byte) error {
222         n, err := xdr.Marshal(dst, SPRaw{Magic: MagicNNCPLv1, Payload: payload})
223         if err == nil {
224                 state.TxLastSeen = time.Now()
225                 state.TxBytes += int64(n)
226         }
227         return err
228 }
229
230 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
231         var sp SPRaw
232         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
233         if err != nil {
234                 return nil, err
235         }
236         state.RxLastSeen = time.Now()
237         state.RxBytes += int64(n)
238         if sp.Magic != MagicNNCPLv1 {
239                 return nil, BadMagic
240         }
241         return sp.Payload, nil
242 }
243
244 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
245         var infos []*SPInfo
246         var totalSize int64
247         for job := range ctx.Jobs(nodeId, TTx) {
248                 job.Fd.Close()
249                 if job.PktEnc.Nice > nice {
250                         continue
251                 }
252                 if _, known := (*seen)[*job.HshValue]; known {
253                         continue
254                 }
255                 totalSize += job.Size
256                 infos = append(infos, &SPInfo{
257                         Nice: job.PktEnc.Nice,
258                         Size: uint64(job.Size),
259                         Hash: job.HshValue,
260                 })
261                 (*seen)[*job.HshValue] = job.PktEnc.Nice
262         }
263         sort.Sort(ByNice(infos))
264         var payloads [][]byte
265         for _, info := range infos {
266                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
267                 ctx.LogD("sp-info-our", SDS{
268                         "node": nodeId,
269                         "name": ToBase32(info.Hash[:]),
270                         "size": strconv.FormatInt(int64(info.Size), 10),
271                 }, "")
272         }
273         if totalSize > 0 {
274                 ctx.LogI("sp-infos", SDS{
275                         "xx":   string(TTx),
276                         "node": nodeId,
277                         "pkts": strconv.Itoa(len(payloads)),
278                         "size": strconv.FormatInt(totalSize, 10),
279                 }, "")
280         }
281         return payloadsSplit(payloads)
282 }
283
284 func (ctx *Ctx) StartI(
285         conn ConnDeadlined,
286         nodeId *NodeId,
287         nice uint8,
288         xxOnly TRxTx,
289         rxRate, txRate int,
290         onlineDeadline, maxOnlineTime uint,
291         listOnly bool) (*SPState, error) {
292         err := ctx.ensureRxDir(nodeId)
293         if err != nil {
294                 return nil, err
295         }
296         var rxLock *os.File
297         if !listOnly && (xxOnly == "" || xxOnly == TRx) {
298                 rxLock, err = ctx.LockDir(nodeId, TRx)
299                 if err != nil {
300                         return nil, err
301                 }
302         }
303         var txLock *os.File
304         if !listOnly && (xxOnly == "" || xxOnly == TTx) {
305                 txLock, err = ctx.LockDir(nodeId, TTx)
306                 if err != nil {
307                         return nil, err
308                 }
309         }
310         started := time.Now()
311         node := ctx.Neigh[*nodeId]
312         conf := noise.Config{
313                 CipherSuite: NoiseCipherSuite,
314                 Pattern:     noise.HandshakeIK,
315                 Initiator:   true,
316                 StaticKeypair: noise.DHKey{
317                         Private: ctx.Self.NoisePrv[:],
318                         Public:  ctx.Self.NoisePub[:],
319                 },
320                 PeerStatic: node.NoisePub[:],
321         }
322         hs, err := noise.NewHandshakeState(conf)
323         if err != nil {
324                 return nil, err
325         }
326         state := SPState{
327                 ctx:            ctx,
328                 hs:             hs,
329                 Node:           node,
330                 onlineDeadline: onlineDeadline,
331                 maxOnlineTime:  maxOnlineTime,
332                 nice:           nice,
333                 payloads:       make(chan []byte),
334                 infosTheir:     make(map[[32]byte]*SPInfo),
335                 infosOurSeen:   make(map[[32]byte]uint8),
336                 started:        started,
337                 rxLock:         rxLock,
338                 txLock:         txLock,
339                 xxOnly:         xxOnly,
340                 rxRate:         rxRate,
341                 txRate:         txRate,
342                 listOnly:       listOnly,
343         }
344
345         var infosPayloads [][]byte
346         if !listOnly && (xxOnly == "" || xxOnly == TTx) {
347                 infosPayloads = ctx.infosOur(nodeId, nice, &state.infosOurSeen)
348         }
349         var firstPayload []byte
350         if len(infosPayloads) > 0 {
351                 firstPayload = infosPayloads[0]
352         }
353         // Pad first payload, to hide actual number of existing files
354         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
355                 firstPayload = append(firstPayload, SPHaltMarshalized...)
356         }
357
358         var buf []byte
359         var payload []byte
360         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
361         if err != nil {
362                 state.dirUnlock()
363                 return nil, err
364         }
365         sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
366         ctx.LogD("sp-start", sds, "sending first message")
367         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
368         if err = state.WriteSP(conn, buf); err != nil {
369                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
370                 state.dirUnlock()
371                 return nil, err
372         }
373         ctx.LogD("sp-start", sds, "waiting for first message")
374         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
375         if buf, err = state.ReadSP(conn); err != nil {
376                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
377                 state.dirUnlock()
378                 return nil, err
379         }
380         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
381         if err != nil {
382                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
383                 state.dirUnlock()
384                 return nil, err
385         }
386         ctx.LogD("sp-start", sds, "starting workers")
387         err = state.StartWorkers(conn, infosPayloads, payload)
388         if err != nil {
389                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
390                 state.dirUnlock()
391                 return nil, err
392         }
393         return &state, err
394 }
395
396 func (ctx *Ctx) StartR(conn ConnDeadlined, nice uint8, xxOnly TRxTx) (*SPState, error) {
397         started := time.Now()
398         conf := noise.Config{
399                 CipherSuite: NoiseCipherSuite,
400                 Pattern:     noise.HandshakeIK,
401                 Initiator:   false,
402                 StaticKeypair: noise.DHKey{
403                         Private: ctx.Self.NoisePrv[:],
404                         Public:  ctx.Self.NoisePub[:],
405                 },
406         }
407         hs, err := noise.NewHandshakeState(conf)
408         if err != nil {
409                 return nil, err
410         }
411         state := SPState{
412                 ctx:          ctx,
413                 hs:           hs,
414                 nice:         nice,
415                 payloads:     make(chan []byte),
416                 infosOurSeen: make(map[[32]byte]uint8),
417                 infosTheir:   make(map[[32]byte]*SPInfo),
418                 started:      started,
419                 xxOnly:       xxOnly,
420         }
421         var buf []byte
422         var payload []byte
423         ctx.LogD(
424                 "sp-start",
425                 SDS{"nice": strconv.Itoa(int(nice))},
426                 "waiting for first message",
427         )
428         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
429         if buf, err = state.ReadSP(conn); err != nil {
430                 ctx.LogE("sp-start", SDS{"err": err}, "")
431                 return nil, err
432         }
433         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
434                 ctx.LogE("sp-start", SDS{"err": err}, "")
435                 return nil, err
436         }
437
438         var node *Node
439         for _, node = range ctx.Neigh {
440                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), node.NoisePub[:]) == 1 {
441                         break
442                 }
443         }
444         if node == nil {
445                 peerId := ToBase32(state.hs.PeerStatic())
446                 ctx.LogE("sp-start", SDS{"peer": peerId}, "unknown")
447                 return nil, errors.New("Unknown peer: " + peerId)
448         }
449         state.Node = node
450         state.rxRate = node.RxRate
451         state.txRate = node.TxRate
452         state.onlineDeadline = node.OnlineDeadline
453         state.maxOnlineTime = node.MaxOnlineTime
454         sds := SDS{"node": node.Id, "nice": strconv.Itoa(int(nice))}
455
456         if ctx.ensureRxDir(node.Id); err != nil {
457                 return nil, err
458         }
459         var rxLock *os.File
460         if xxOnly == "" || xxOnly == TRx {
461                 rxLock, err = ctx.LockDir(node.Id, TRx)
462                 if err != nil {
463                         return nil, err
464                 }
465         }
466         state.rxLock = rxLock
467         var txLock *os.File
468         if xxOnly == "" || xxOnly == TTx {
469                 txLock, err = ctx.LockDir(node.Id, TTx)
470                 if err != nil {
471                         return nil, err
472                 }
473         }
474         state.txLock = txLock
475
476         var infosPayloads [][]byte
477         if xxOnly == "" || xxOnly == TTx {
478                 infosPayloads = ctx.infosOur(node.Id, nice, &state.infosOurSeen)
479         }
480         var firstPayload []byte
481         if len(infosPayloads) > 0 {
482                 firstPayload = infosPayloads[0]
483         }
484         // Pad first payload, to hide actual number of existing files
485         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
486                 firstPayload = append(firstPayload, SPHaltMarshalized...)
487         }
488
489         ctx.LogD("sp-start", sds, "sending first message")
490         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
491         if err != nil {
492                 state.dirUnlock()
493                 return nil, err
494         }
495         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
496         if err = state.WriteSP(conn, buf); err != nil {
497                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
498                 state.dirUnlock()
499                 return nil, err
500         }
501         ctx.LogD("sp-start", sds, "starting workers")
502         err = state.StartWorkers(conn, infosPayloads, payload)
503         if err != nil {
504                 state.dirUnlock()
505                 return nil, err
506         }
507         return &state, err
508 }
509
510 func (state *SPState) StartWorkers(
511         conn ConnDeadlined,
512         infosPayloads [][]byte,
513         payload []byte) error {
514         sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.nice))}
515         if len(infosPayloads) > 1 {
516                 go func() {
517                         for _, payload := range infosPayloads[1:] {
518                                 state.ctx.LogD(
519                                         "sp-work",
520                                         SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
521                                         "queuing remaining payload",
522                                 )
523                                 state.payloads <- payload
524                         }
525                 }()
526         }
527         state.ctx.LogD(
528                 "sp-work",
529                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
530                 "processing first payload",
531         )
532         replies, err := state.ProcessSP(payload)
533         if err != nil {
534                 state.ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "")
535                 return err
536         }
537
538         go func() {
539                 for _, reply := range replies {
540                         state.ctx.LogD(
541                                 "sp-work",
542                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
543                                 "queuing reply",
544                         )
545                         state.payloads <- reply
546                 }
547         }()
548
549         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
550                 go func() {
551                         for range time.Tick(time.Second) {
552                                 if state.NotAlive() {
553                                         return
554                                 }
555                                 for _, payload := range state.ctx.infosOur(
556                                         state.Node.Id,
557                                         state.nice,
558                                         &state.infosOurSeen,
559                                 ) {
560                                         state.ctx.LogD(
561                                                 "sp-work",
562                                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
563                                                 "queuing new info",
564                                         )
565                                         state.payloads <- payload
566                                 }
567                         }
568                 }()
569         }
570
571         state.wg.Add(1)
572         go func() {
573                 defer func() {
574                         state.isDead = true
575                         state.wg.Done()
576                 }()
577                 for {
578                         if state.NotAlive() {
579                                 return
580                         }
581                         var payload []byte
582                         select {
583                         case payload = <-state.payloads:
584                                 state.ctx.LogD(
585                                         "sp-xmit",
586                                         SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
587                                         "got payload",
588                                 )
589                         default:
590                         }
591                         if payload == nil {
592                                 state.RLock()
593                                 if len(state.queueTheir) == 0 {
594                                         state.ctx.LogD("sp-xmit", sds, "file queue is empty")
595                                         state.RUnlock()
596                                         time.Sleep(100 * time.Millisecond)
597                                         continue
598                                 }
599                                 freq := state.queueTheir[0].freq
600                                 state.RUnlock()
601
602                                 if state.txRate > 0 {
603                                         time.Sleep(time.Second / time.Duration(state.txRate))
604                                 }
605
606                                 sdsp := SdsAdd(sds, SDS{
607                                         "xx":   string(TTx),
608                                         "hash": ToBase32(freq.Hash[:]),
609                                         "size": strconv.FormatInt(int64(freq.Offset), 10),
610                                 })
611                                 state.ctx.LogD("sp-file", sdsp, "queueing")
612                                 fd, err := os.Open(filepath.Join(
613                                         state.ctx.Spool,
614                                         state.Node.Id.String(),
615                                         string(TTx),
616                                         ToBase32(freq.Hash[:]),
617                                 ))
618                                 if err != nil {
619                                         state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
620                                         break
621                                 }
622                                 fi, err := fd.Stat()
623                                 if err != nil {
624                                         state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
625                                         break
626                                 }
627                                 fullSize := uint64(fi.Size())
628                                 var buf []byte
629                                 if freq.Offset < fullSize {
630                                         state.ctx.LogD("sp-file", sdsp, "seeking")
631                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
632                                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
633                                                 break
634                                         }
635                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
636                                         n, err := fd.Read(buf)
637                                         if err != nil {
638                                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
639                                                 break
640                                         }
641                                         buf = buf[:n]
642                                         state.ctx.LogD(
643                                                 "sp-file",
644                                                 SdsAdd(sdsp, SDS{"size": strconv.Itoa(n)}),
645                                                 "read",
646                                         )
647                                 }
648                                 fd.Close()
649                                 payload = MarshalSP(SPTypeFile, SPFile{
650                                         Hash:    freq.Hash,
651                                         Offset:  freq.Offset,
652                                         Payload: buf,
653                                 })
654                                 ourSize := freq.Offset + uint64(len(buf))
655                                 sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
656                                 sdsp["fullsize"] = strconv.FormatInt(int64(fullSize), 10)
657                                 state.ctx.LogP("sp-file", sdsp, "")
658                                 state.Lock()
659                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
660                                         if ourSize == fullSize {
661                                                 state.ctx.LogD("sp-file", sdsp, "finished")
662                                                 if len(state.queueTheir) > 1 {
663                                                         state.queueTheir = state.queueTheir[1:]
664                                                 } else {
665                                                         state.queueTheir = state.queueTheir[:0]
666                                                 }
667                                         } else {
668                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
669                                         }
670                                 } else {
671                                         state.ctx.LogD("sp-file", sdsp, "queue disappeared")
672                                 }
673                                 state.Unlock()
674                         }
675                         state.ctx.LogD(
676                                 "sp-xmit",
677                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
678                                 "sending",
679                         )
680                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
681                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil {
682                                 state.ctx.LogE("sp-xmit", SdsAdd(sds, SDS{"err": err}), "")
683                                 break
684                         }
685                 }
686         }()
687
688         state.wg.Add(1)
689         go func() {
690                 defer func() {
691                         state.isDead = true
692                         state.wg.Done()
693                 }()
694                 for {
695                         if state.NotAlive() {
696                                 return
697                         }
698                         state.ctx.LogD("sp-recv", sds, "waiting for payload")
699                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
700                         payload, err := state.ReadSP(conn)
701                         if err != nil {
702                                 unmarshalErr := err.(*xdr.UnmarshalError)
703                                 netErr, ok := unmarshalErr.Err.(net.Error)
704                                 if ok && netErr.Timeout() {
705                                         continue
706                                 }
707                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
708                                         break
709                                 }
710                                 state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
711                                 break
712                         }
713                         state.ctx.LogD(
714                                 "sp-recv",
715                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
716                                 "got payload",
717                         )
718                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
719                         if err != nil {
720                                 state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
721                                 break
722                         }
723                         state.ctx.LogD(
724                                 "sp-recv",
725                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
726                                 "processing",
727                         )
728                         replies, err := state.ProcessSP(payload)
729                         if err != nil {
730                                 state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
731                                 break
732                         }
733                         go func() {
734                                 for _, reply := range replies {
735                                         state.ctx.LogD(
736                                                 "sp-recv",
737                                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
738                                                 "queuing reply",
739                                         )
740                                         state.payloads <- reply
741                                 }
742                         }()
743                         if state.rxRate > 0 {
744                                 time.Sleep(time.Second / time.Duration(state.rxRate))
745                         }
746                 }
747         }()
748
749         return nil
750 }
751
752 func (state *SPState) Wait() {
753         state.wg.Wait()
754         state.dirUnlock()
755         state.Duration = time.Now().Sub(state.started)
756         state.RxSpeed = state.RxBytes
757         state.TxSpeed = state.TxBytes
758         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
759         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
760         if rxDuration > 0 {
761                 state.RxSpeed = state.RxBytes / rxDuration
762         }
763         if txDuration > 0 {
764                 state.TxSpeed = state.TxBytes / txDuration
765         }
766 }
767
768 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
769         sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.nice))}
770         r := bytes.NewReader(payload)
771         var err error
772         var replies [][]byte
773         var infosGot bool
774         for r.Len() > 0 {
775                 state.ctx.LogD("sp-process", sds, "unmarshaling header")
776                 var head SPHead
777                 if _, err = xdr.Unmarshal(r, &head); err != nil {
778                         state.ctx.LogE("sp-process", SdsAdd(sds, SDS{"err": err}), "")
779                         return nil, err
780                 }
781                 switch head.Type {
782                 case SPTypeInfo:
783                         infosGot = true
784                         sdsp := SdsAdd(sds, SDS{"type": "info"})
785                         state.ctx.LogD("sp-process", sdsp, "unmarshaling packet")
786                         var info SPInfo
787                         if _, err = xdr.Unmarshal(r, &info); err != nil {
788                                 state.ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "")
789                                 return nil, err
790                         }
791                         sdsp = SdsAdd(sds, SDS{
792                                 "hash": ToBase32(info.Hash[:]),
793                                 "size": strconv.FormatInt(int64(info.Size), 10),
794                                 "nice": strconv.Itoa(int(info.Nice)),
795                         })
796                         if !state.listOnly && info.Nice > state.nice {
797                                 state.ctx.LogD("sp-process", sdsp, "too nice")
798                                 continue
799                         }
800                         state.ctx.LogD("sp-process", sdsp, "received")
801                         if !state.listOnly && state.xxOnly == TTx {
802                                 continue
803                         }
804                         state.Lock()
805                         state.infosTheir[*info.Hash] = &info
806                         state.Unlock()
807                         state.ctx.LogD("sp-process", sdsp, "stating part")
808                         pktPath := filepath.Join(
809                                 state.ctx.Spool,
810                                 state.Node.Id.String(),
811                                 string(TRx),
812                                 ToBase32(info.Hash[:]),
813                         )
814                         if _, err = os.Stat(pktPath); err == nil {
815                                 state.ctx.LogI("sp-info", sdsp, "already done")
816                                 if !state.listOnly {
817                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
818                                 }
819                                 continue
820                         }
821                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
822                                 state.ctx.LogI("sp-info", sdsp, "already seen")
823                                 if !state.listOnly {
824                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
825                                 }
826                                 continue
827                         }
828                         fi, err := os.Stat(pktPath + PartSuffix)
829                         var offset int64
830                         if err == nil {
831                                 offset = fi.Size()
832                         }
833                         if !state.ctx.IsEnoughSpace(int64(info.Size) - offset) {
834                                 state.ctx.LogI("sp-info", sdsp, "not enough space")
835                                 continue
836                         }
837                         state.ctx.LogI(
838                                 "sp-info",
839                                 SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(offset, 10)}),
840                                 "",
841                         )
842                         if !state.listOnly {
843                                 replies = append(replies, MarshalSP(
844                                         SPTypeFreq,
845                                         SPFreq{info.Hash, uint64(offset)},
846                                 ))
847                         }
848                 case SPTypeFile:
849                         sdsp := SdsAdd(sds, SDS{"type": "file"})
850                         state.ctx.LogD("sp-process", sdsp, "unmarshaling packet")
851                         var file SPFile
852                         if _, err = xdr.Unmarshal(r, &file); err != nil {
853                                 state.ctx.LogE("sp-process", SdsAdd(sds, SDS{
854                                         "err":  err,
855                                         "type": "file",
856                                 }), "")
857                                 return nil, err
858                         }
859                         sdsp["xx"] = string(TRx)
860                         sdsp["hash"] = ToBase32(file.Hash[:])
861                         sdsp["size"] = strconv.Itoa(len(file.Payload))
862                         filePath := filepath.Join(
863                                 state.ctx.Spool,
864                                 state.Node.Id.String(),
865                                 string(TRx),
866                                 ToBase32(file.Hash[:]),
867                         )
868                         state.ctx.LogD("sp-file", sdsp, "opening part")
869                         fd, err := os.OpenFile(
870                                 filePath+PartSuffix,
871                                 os.O_RDWR|os.O_CREATE,
872                                 os.FileMode(0600),
873                         )
874                         if err != nil {
875                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
876                                 return nil, err
877                         }
878                         state.ctx.LogD(
879                                 "sp-file",
880                                 SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(int64(file.Offset), 10)}),
881                                 "seeking",
882                         )
883                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
884                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
885                                 fd.Close()
886                                 return nil, err
887                         }
888                         state.ctx.LogD("sp-file", sdsp, "writing")
889                         _, err = fd.Write(file.Payload)
890                         if err != nil {
891                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
892                                 fd.Close()
893                                 return nil, err
894                         }
895                         ourSize := uint64(file.Offset) + uint64(len(file.Payload))
896                         state.RLock()
897                         sdsp["fullsize"] = strconv.FormatInt(int64(state.infosTheir[*file.Hash].Size), 10)
898                         sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
899                         state.ctx.LogP("sp-file", sdsp, "")
900                         if state.infosTheir[*file.Hash].Size != ourSize {
901                                 state.RUnlock()
902                                 fd.Close()
903                                 continue
904                         }
905                         state.RUnlock()
906                         spWorkersGroup.Wait()
907                         spWorkersGroup.Add(1)
908                         go func() {
909                                 if err := fd.Sync(); err != nil {
910                                         state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync")
911                                         fd.Close()
912                                         return
913                                 }
914                                 state.wg.Add(1)
915                                 defer state.wg.Done()
916                                 fd.Seek(0, io.SeekStart)
917                                 state.ctx.LogD("sp-file", sdsp, "checking")
918                                 gut, err := Check(fd, file.Hash[:])
919                                 fd.Close()
920                                 if err != nil || !gut {
921                                         state.ctx.LogE("sp-file", sdsp, "checksum mismatch")
922                                         return
923                                 }
924                                 state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
925                                 os.Rename(filePath+PartSuffix, filePath)
926                                 state.Lock()
927                                 delete(state.infosTheir, *file.Hash)
928                                 state.Unlock()
929                                 spWorkersGroup.Done()
930                                 go func() {
931                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
932                                 }()
933                         }()
934                 case SPTypeDone:
935                         sdsp := SdsAdd(sds, SDS{"type": "done"})
936                         state.ctx.LogD("sp-process", sdsp, "unmarshaling packet")
937                         var done SPDone
938                         if _, err = xdr.Unmarshal(r, &done); err != nil {
939                                 state.ctx.LogE("sp-process", SdsAdd(sds, SDS{
940                                         "type": "done",
941                                         "err":  err,
942                                 }), "")
943                                 return nil, err
944                         }
945                         sdsp["hash"] = ToBase32(done.Hash[:])
946                         state.ctx.LogD("sp-done", sdsp, "removing")
947                         err := os.Remove(filepath.Join(
948                                 state.ctx.Spool,
949                                 state.Node.Id.String(),
950                                 string(TTx),
951                                 ToBase32(done.Hash[:]),
952                         ))
953                         sdsp["xx"] = string(TTx)
954                         if err == nil {
955                                 state.ctx.LogI("sp-done", sdsp, "")
956                         } else {
957                                 state.ctx.LogE("sp-done", sdsp, "")
958                         }
959                 case SPTypeFreq:
960                         sdsp := SdsAdd(sds, SDS{"type": "freq"})
961                         state.ctx.LogD("sp-process", sdsp, "unmarshaling packet")
962                         var freq SPFreq
963                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
964                                 state.ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "")
965                                 return nil, err
966                         }
967                         sdsp["hash"] = ToBase32(freq.Hash[:])
968                         sdsp["offset"] = strconv.FormatInt(int64(freq.Offset), 10)
969                         state.ctx.LogD("sp-process", sdsp, "queueing")
970                         nice, exists := state.infosOurSeen[*freq.Hash]
971                         if exists {
972                                 state.Lock()
973                                 insertIdx := 0
974                                 var freqWithNice *FreqWithNice
975                                 for insertIdx, freqWithNice = range state.queueTheir {
976                                         if freqWithNice.nice > nice {
977                                                 break
978                                         }
979                                 }
980                                 state.queueTheir = append(state.queueTheir, nil)
981                                 copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
982                                 state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
983                                 state.Unlock()
984                         } else {
985                                 state.ctx.LogD("sp-process", sdsp, "unknown")
986                         }
987                 case SPTypeHalt:
988                         state.ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "")
989                         state.Lock()
990                         state.queueTheir = nil
991                         state.Unlock()
992                 default:
993                         state.ctx.LogE(
994                                 "sp-process",
995                                 SdsAdd(sds, SDS{"type": head.Type}),
996                                 "unknown",
997                         )
998                         return nil, BadPktType
999                 }
1000         }
1001         if infosGot {
1002                 var pkts int
1003                 var size uint64
1004                 state.RLock()
1005                 for _, info := range state.infosTheir {
1006                         pkts++
1007                         size += info.Size
1008                 }
1009                 state.RUnlock()
1010                 state.ctx.LogI("sp-infos", SDS{
1011                         "xx":   string(TRx),
1012                         "node": state.Node.Id,
1013                         "pkts": strconv.Itoa(pkts),
1014                         "size": strconv.FormatInt(int64(size), 10),
1015                 }, "")
1016         }
1017         return payloadsSplit(replies), nil
1018 }