]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
Operations progress
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "io"
25         "net"
26         "os"
27         "path/filepath"
28         "sort"
29         "sync"
30         "time"
31
32         xdr "github.com/davecgh/go-xdr/xdr2"
33         "github.com/flynn/noise"
34 )
35
36 const (
37         MaxSPSize       = 1<<16 - 256
38         PartSuffix      = ".part"
39         DefaultDeadline = 10
40 )
41
42 var (
43         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
44
45         SPHeadOverhead    int
46         SPInfoOverhead    int
47         SPFreqOverhead    int
48         SPFileOverhead    int
49         SPHaltMarshalized []byte
50
51         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
52                 noise.DH25519,
53                 noise.CipherChaChaPoly,
54                 noise.HashBLAKE2b,
55         )
56
57         spWorkersGroup sync.WaitGroup
58 )
59
60 type SPType uint8
61
62 const (
63         SPTypeInfo SPType = iota
64         SPTypeFreq SPType = iota
65         SPTypeFile SPType = iota
66         SPTypeDone SPType = iota
67         SPTypeHalt SPType = iota
68 )
69
70 type SPHead struct {
71         Type SPType
72 }
73
74 type SPInfo struct {
75         Nice uint8
76         Size uint64
77         Hash *[32]byte
78 }
79
80 type SPFreq struct {
81         Hash   *[32]byte
82         Offset uint64
83 }
84
85 type SPFile struct {
86         Hash    *[32]byte
87         Offset  uint64
88         Payload []byte
89 }
90
91 type SPDone struct {
92         Hash *[32]byte
93 }
94
95 type SPRaw struct {
96         Magic   [8]byte
97         Payload []byte
98 }
99
100 type FreqWithNice struct {
101         freq *SPFreq
102         nice uint8
103 }
104
105 type ConnDeadlined interface {
106         io.ReadWriteCloser
107         SetReadDeadline(t time.Time) error
108         SetWriteDeadline(t time.Time) error
109 }
110
111 func init() {
112         var buf bytes.Buffer
113         spHead := SPHead{Type: SPTypeHalt}
114         if _, err := xdr.Marshal(&buf, spHead); err != nil {
115                 panic(err)
116         }
117         copy(SPHaltMarshalized, buf.Bytes())
118         SPHeadOverhead = buf.Len()
119         buf.Reset()
120
121         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
122         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
123                 panic(err)
124         }
125         SPInfoOverhead = buf.Len()
126         buf.Reset()
127
128         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
129         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
130                 panic(err)
131         }
132         SPFreqOverhead = buf.Len()
133         buf.Reset()
134
135         spFile := SPFile{Hash: new([32]byte), Offset: 123}
136         if _, err := xdr.Marshal(&buf, spFile); err != nil {
137                 panic(err)
138         }
139         SPFileOverhead = buf.Len()
140 }
141
142 func MarshalSP(typ SPType, sp interface{}) []byte {
143         var buf bytes.Buffer
144         var err error
145         if _, err = xdr.Marshal(&buf, SPHead{typ}); err != nil {
146                 panic(err)
147         }
148         if _, err = xdr.Marshal(&buf, sp); err != nil {
149                 panic(err)
150         }
151         return buf.Bytes()
152 }
153
154 func payloadsSplit(payloads [][]byte) [][]byte {
155         var outbounds [][]byte
156         outbound := make([]byte, 0, MaxSPSize)
157         for i, payload := range payloads {
158                 outbound = append(outbound, payload...)
159                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
160                         outbounds = append(outbounds, outbound)
161                         outbound = make([]byte, 0, MaxSPSize)
162                 }
163         }
164         if len(outbound) > 0 {
165                 outbounds = append(outbounds, outbound)
166         }
167         return outbounds
168 }
169
170 type SPState struct {
171         Ctx            *Ctx
172         Node           *Node
173         Nice           uint8
174         onlineDeadline uint
175         maxOnlineTime  uint
176         hs             *noise.HandshakeState
177         csOur          *noise.CipherState
178         csTheir        *noise.CipherState
179         payloads       chan []byte
180         infosTheir     map[[32]byte]*SPInfo
181         infosOurSeen   map[[32]byte]uint8
182         queueTheir     []*FreqWithNice
183         wg             sync.WaitGroup
184         RxBytes        int64
185         RxLastSeen     time.Time
186         TxBytes        int64
187         TxLastSeen     time.Time
188         started        time.Time
189         Duration       time.Duration
190         RxSpeed        int64
191         TxSpeed        int64
192         rxLock         *os.File
193         txLock         *os.File
194         xxOnly         TRxTx
195         rxRate         int
196         txRate         int
197         isDead         bool
198         listOnly       bool
199         onlyPkts       map[[32]byte]bool
200         sync.RWMutex
201 }
202
203 func (state *SPState) NotAlive() bool {
204         if state.isDead {
205                 return true
206         }
207         now := time.Now()
208         if state.maxOnlineTime > 0 && state.started.Add(time.Duration(state.maxOnlineTime)*time.Second).Before(now) {
209                 return true
210         }
211         return uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline &&
212                 uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline
213 }
214
215 func (state *SPState) dirUnlock() {
216         state.Ctx.UnlockDir(state.rxLock)
217         state.Ctx.UnlockDir(state.txLock)
218 }
219
220 func (state *SPState) WriteSP(dst io.Writer, payload []byte) error {
221         n, err := xdr.Marshal(dst, SPRaw{Magic: MagicNNCPLv1, Payload: payload})
222         if err == nil {
223                 state.TxLastSeen = time.Now()
224                 state.TxBytes += int64(n)
225         }
226         return err
227 }
228
229 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
230         var sp SPRaw
231         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
232         if err != nil {
233                 ue := err.(*xdr.UnmarshalError)
234                 if ue.Err == io.EOF {
235                         return nil, ue.Err
236                 }
237                 return nil, err
238         }
239         state.RxLastSeen = time.Now()
240         state.RxBytes += int64(n)
241         if sp.Magic != MagicNNCPLv1 {
242                 return nil, BadMagic
243         }
244         return sp.Payload, nil
245 }
246
247 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
248         var infos []*SPInfo
249         var totalSize int64
250         for job := range ctx.Jobs(nodeId, TTx) {
251                 job.Fd.Close()
252                 if job.PktEnc.Nice > nice {
253                         continue
254                 }
255                 if _, known := (*seen)[*job.HshValue]; known {
256                         continue
257                 }
258                 totalSize += job.Size
259                 infos = append(infos, &SPInfo{
260                         Nice: job.PktEnc.Nice,
261                         Size: uint64(job.Size),
262                         Hash: job.HshValue,
263                 })
264                 (*seen)[*job.HshValue] = job.PktEnc.Nice
265         }
266         sort.Sort(ByNice(infos))
267         var payloads [][]byte
268         for _, info := range infos {
269                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
270                 ctx.LogD("sp-info-our", SDS{
271                         "node": nodeId,
272                         "name": ToBase32(info.Hash[:]),
273                         "size": info.Size,
274                 }, "")
275         }
276         if totalSize > 0 {
277                 ctx.LogI("sp-infos", SDS{
278                         "xx":   string(TTx),
279                         "node": nodeId,
280                         "pkts": len(payloads),
281                         "size": totalSize,
282                 }, "")
283         }
284         return payloadsSplit(payloads)
285 }
286
287 func (state *SPState) StartI(conn ConnDeadlined) error {
288         nodeId := state.Node.Id
289         err := state.Ctx.ensureRxDir(nodeId)
290         if err != nil {
291                 return err
292         }
293         var rxLock *os.File
294         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
295                 rxLock, err = state.Ctx.LockDir(nodeId, TRx)
296                 if err != nil {
297                         return err
298                 }
299         }
300         var txLock *os.File
301         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
302                 txLock, err = state.Ctx.LockDir(nodeId, TTx)
303                 if err != nil {
304                         return err
305                 }
306         }
307         started := time.Now()
308         conf := noise.Config{
309                 CipherSuite: NoiseCipherSuite,
310                 Pattern:     noise.HandshakeIK,
311                 Initiator:   true,
312                 StaticKeypair: noise.DHKey{
313                         Private: state.Ctx.Self.NoisePrv[:],
314                         Public:  state.Ctx.Self.NoisePub[:],
315                 },
316                 PeerStatic: state.Node.NoisePub[:],
317         }
318         hs, err := noise.NewHandshakeState(conf)
319         if err != nil {
320                 return err
321         }
322         state.hs = hs
323         state.payloads = make(chan []byte)
324         state.infosTheir = make(map[[32]byte]*SPInfo)
325         state.infosOurSeen = make(map[[32]byte]uint8)
326         state.started = started
327         state.rxLock = rxLock
328         state.txLock = txLock
329
330         var infosPayloads [][]byte
331         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
332                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
333         }
334         var firstPayload []byte
335         if len(infosPayloads) > 0 {
336                 firstPayload = infosPayloads[0]
337         }
338         // Pad first payload, to hide actual number of existing files
339         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
340                 firstPayload = append(firstPayload, SPHaltMarshalized...)
341         }
342
343         var buf []byte
344         var payload []byte
345         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
346         if err != nil {
347                 state.dirUnlock()
348                 return err
349         }
350         sds := SDS{"node": nodeId, "nice": int(state.Nice)}
351         state.Ctx.LogD("sp-start", sds, "sending first message")
352         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
353         if err = state.WriteSP(conn, buf); err != nil {
354                 state.Ctx.LogE("sp-start", sds, err, "")
355                 state.dirUnlock()
356                 return err
357         }
358         state.Ctx.LogD("sp-start", sds, "waiting for first message")
359         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
360         if buf, err = state.ReadSP(conn); err != nil {
361                 state.Ctx.LogE("sp-start", sds, err, "")
362                 state.dirUnlock()
363                 return err
364         }
365         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
366         if err != nil {
367                 state.Ctx.LogE("sp-start", sds, err, "")
368                 state.dirUnlock()
369                 return err
370         }
371         state.Ctx.LogD("sp-start", sds, "starting workers")
372         err = state.StartWorkers(conn, infosPayloads, payload)
373         if err != nil {
374                 state.Ctx.LogE("sp-start", sds, err, "")
375                 state.dirUnlock()
376                 return err
377         }
378         return err
379 }
380
381 func (state *SPState) StartR(conn ConnDeadlined) error {
382         started := time.Now()
383         conf := noise.Config{
384                 CipherSuite: NoiseCipherSuite,
385                 Pattern:     noise.HandshakeIK,
386                 Initiator:   false,
387                 StaticKeypair: noise.DHKey{
388                         Private: state.Ctx.Self.NoisePrv[:],
389                         Public:  state.Ctx.Self.NoisePub[:],
390                 },
391         }
392         hs, err := noise.NewHandshakeState(conf)
393         if err != nil {
394                 return err
395         }
396         xxOnly := TRxTx("")
397         state.hs = hs
398         state.payloads = make(chan []byte)
399         state.infosOurSeen = make(map[[32]byte]uint8)
400         state.infosTheir = make(map[[32]byte]*SPInfo)
401         state.started = started
402         state.xxOnly = xxOnly
403         var buf []byte
404         var payload []byte
405         state.Ctx.LogD("sp-start", SDS{"nice": int(state.Nice)}, "waiting for first message")
406         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
407         if buf, err = state.ReadSP(conn); err != nil {
408                 state.Ctx.LogE("sp-start", SDS{}, err, "")
409                 return err
410         }
411         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
412                 state.Ctx.LogE("sp-start", SDS{}, err, "")
413                 return err
414         }
415
416         var node *Node
417         for _, n := range state.Ctx.Neigh {
418                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
419                         node = n
420                         break
421                 }
422         }
423         if node == nil {
424                 peerId := ToBase32(state.hs.PeerStatic())
425                 state.Ctx.LogE("sp-start", SDS{"peer": peerId}, errors.New("unknown"), "")
426                 return errors.New("Unknown peer: " + peerId)
427         }
428         state.Node = node
429         state.rxRate = node.RxRate
430         state.txRate = node.TxRate
431         state.onlineDeadline = node.OnlineDeadline
432         state.maxOnlineTime = node.MaxOnlineTime
433         sds := SDS{"node": node.Id, "nice": int(state.Nice)}
434
435         if state.Ctx.ensureRxDir(node.Id); err != nil {
436                 return err
437         }
438         var rxLock *os.File
439         if xxOnly == "" || xxOnly == TRx {
440                 rxLock, err = state.Ctx.LockDir(node.Id, TRx)
441                 if err != nil {
442                         return err
443                 }
444         }
445         state.rxLock = rxLock
446         var txLock *os.File
447         if xxOnly == "" || xxOnly == TTx {
448                 txLock, err = state.Ctx.LockDir(node.Id, TTx)
449                 if err != nil {
450                         return err
451                 }
452         }
453         state.txLock = txLock
454
455         var infosPayloads [][]byte
456         if xxOnly == "" || xxOnly == TTx {
457                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
458         }
459         var firstPayload []byte
460         if len(infosPayloads) > 0 {
461                 firstPayload = infosPayloads[0]
462         }
463         // Pad first payload, to hide actual number of existing files
464         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
465                 firstPayload = append(firstPayload, SPHaltMarshalized...)
466         }
467
468         state.Ctx.LogD("sp-start", sds, "sending first message")
469         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
470         if err != nil {
471                 state.dirUnlock()
472                 return err
473         }
474         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
475         if err = state.WriteSP(conn, buf); err != nil {
476                 state.Ctx.LogE("sp-start", sds, err, "")
477                 state.dirUnlock()
478                 return err
479         }
480         state.Ctx.LogD("sp-start", sds, "starting workers")
481         err = state.StartWorkers(conn, infosPayloads, payload)
482         if err != nil {
483                 state.dirUnlock()
484                 return err
485         }
486         return err
487 }
488
489 func (state *SPState) StartWorkers(
490         conn ConnDeadlined,
491         infosPayloads [][]byte,
492         payload []byte) error {
493         sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
494         if len(infosPayloads) > 1 {
495                 go func() {
496                         for _, payload := range infosPayloads[1:] {
497                                 state.Ctx.LogD(
498                                         "sp-work",
499                                         SdsAdd(sds, SDS{"size": len(payload)}),
500                                         "queuing remaining payload",
501                                 )
502                                 state.payloads <- payload
503                         }
504                 }()
505         }
506         state.Ctx.LogD(
507                 "sp-work",
508                 SdsAdd(sds, SDS{"size": len(payload)}),
509                 "processing first payload",
510         )
511         replies, err := state.ProcessSP(payload)
512         if err != nil {
513                 state.Ctx.LogE("sp-work", sds, err, "")
514                 return err
515         }
516
517         go func() {
518                 for _, reply := range replies {
519                         state.Ctx.LogD(
520                                 "sp-work",
521                                 SdsAdd(sds, SDS{"size": len(reply)}),
522                                 "queuing reply",
523                         )
524                         state.payloads <- reply
525                 }
526         }()
527
528         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
529                 go func() {
530                         for range time.Tick(time.Second) {
531                                 if state.NotAlive() {
532                                         return
533                                 }
534                                 for _, payload := range state.Ctx.infosOur(
535                                         state.Node.Id,
536                                         state.Nice,
537                                         &state.infosOurSeen,
538                                 ) {
539                                         state.Ctx.LogD(
540                                                 "sp-work",
541                                                 SdsAdd(sds, SDS{"size": len(payload)}),
542                                                 "queuing new info",
543                                         )
544                                         state.payloads <- payload
545                                 }
546                         }
547                 }()
548         }
549
550         state.wg.Add(1)
551         go func() {
552                 defer func() {
553                         state.isDead = true
554                         state.wg.Done()
555                 }()
556                 for {
557                         if state.NotAlive() {
558                                 return
559                         }
560                         var payload []byte
561                         select {
562                         case payload = <-state.payloads:
563                                 state.Ctx.LogD(
564                                         "sp-xmit",
565                                         SdsAdd(sds, SDS{"size": len(payload)}),
566                                         "got payload",
567                                 )
568                         default:
569                         }
570                         if payload == nil {
571                                 state.RLock()
572                                 if len(state.queueTheir) == 0 {
573                                         state.Ctx.LogD("sp-xmit", sds, "file queue is empty")
574                                         state.RUnlock()
575                                         time.Sleep(100 * time.Millisecond)
576                                         continue
577                                 }
578                                 freq := state.queueTheir[0].freq
579                                 state.RUnlock()
580
581                                 if state.txRate > 0 {
582                                         time.Sleep(time.Second / time.Duration(state.txRate))
583                                 }
584
585                                 sdsp := SdsAdd(sds, SDS{
586                                         "xx":   string(TTx),
587                                         "pkt":  ToBase32(freq.Hash[:]),
588                                         "size": int64(freq.Offset),
589                                 })
590                                 state.Ctx.LogD("sp-file", sdsp, "queueing")
591                                 fd, err := os.Open(filepath.Join(
592                                         state.Ctx.Spool,
593                                         state.Node.Id.String(),
594                                         string(TTx),
595                                         ToBase32(freq.Hash[:]),
596                                 ))
597                                 if err != nil {
598                                         state.Ctx.LogE("sp-file", sdsp, err, "")
599                                         break
600                                 }
601                                 fi, err := fd.Stat()
602                                 if err != nil {
603                                         state.Ctx.LogE("sp-file", sdsp, err, "")
604                                         break
605                                 }
606                                 fullSize := fi.Size()
607                                 var buf []byte
608                                 if freq.Offset < uint64(fullSize) {
609                                         state.Ctx.LogD("sp-file", sdsp, "seeking")
610                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
611                                                 state.Ctx.LogE("sp-file", sdsp, err, "")
612                                                 break
613                                         }
614                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
615                                         n, err := fd.Read(buf)
616                                         if err != nil {
617                                                 state.Ctx.LogE("sp-file", sdsp, err, "")
618                                                 break
619                                         }
620                                         buf = buf[:n]
621                                         state.Ctx.LogD(
622                                                 "sp-file",
623                                                 SdsAdd(sdsp, SDS{"size": n}),
624                                                 "read",
625                                         )
626                                 }
627                                 fd.Close()
628                                 payload = MarshalSP(SPTypeFile, SPFile{
629                                         Hash:    freq.Hash,
630                                         Offset:  freq.Offset,
631                                         Payload: buf,
632                                 })
633                                 ourSize := freq.Offset + uint64(len(buf))
634                                 sdsp["size"] = int64(ourSize)
635                                 sdsp["fullsize"] = fullSize
636                                 if state.Ctx.ShowPrgrs {
637                                         Progress(sdsp)
638                                 }
639                                 state.Lock()
640                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
641                                         if ourSize == uint64(fullSize) {
642                                                 state.Ctx.LogD("sp-file", sdsp, "finished")
643                                                 if len(state.queueTheir) > 1 {
644                                                         state.queueTheir = state.queueTheir[1:]
645                                                 } else {
646                                                         state.queueTheir = state.queueTheir[:0]
647                                                 }
648                                         } else {
649                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
650                                         }
651                                 } else {
652                                         state.Ctx.LogD("sp-file", sdsp, "queue disappeared")
653                                 }
654                                 state.Unlock()
655                         }
656                         state.Ctx.LogD(
657                                 "sp-xmit",
658                                 SdsAdd(sds, SDS{"size": len(payload)}),
659                                 "sending",
660                         )
661                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
662                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil {
663                                 state.Ctx.LogE("sp-xmit", sds, err, "")
664                                 break
665                         }
666                 }
667         }()
668
669         state.wg.Add(1)
670         go func() {
671                 defer func() {
672                         state.isDead = true
673                         state.wg.Done()
674                 }()
675                 for {
676                         if state.NotAlive() {
677                                 return
678                         }
679                         state.Ctx.LogD("sp-recv", sds, "waiting for payload")
680                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
681                         payload, err := state.ReadSP(conn)
682                         if err != nil {
683                                 unmarshalErr := err.(*xdr.UnmarshalError)
684                                 netErr, ok := unmarshalErr.Err.(net.Error)
685                                 if ok && netErr.Timeout() {
686                                         continue
687                                 }
688                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
689                                         break
690                                 }
691                                 state.Ctx.LogE("sp-recv", sds, err, "")
692                                 break
693                         }
694                         state.Ctx.LogD(
695                                 "sp-recv",
696                                 SdsAdd(sds, SDS{"size": len(payload)}),
697                                 "got payload",
698                         )
699                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
700                         if err != nil {
701                                 state.Ctx.LogE("sp-recv", sds, err, "")
702                                 break
703                         }
704                         state.Ctx.LogD(
705                                 "sp-recv",
706                                 SdsAdd(sds, SDS{"size": len(payload)}),
707                                 "processing",
708                         )
709                         replies, err := state.ProcessSP(payload)
710                         if err != nil {
711                                 state.Ctx.LogE("sp-recv", sds, err, "")
712                                 break
713                         }
714                         go func() {
715                                 for _, reply := range replies {
716                                         state.Ctx.LogD(
717                                                 "sp-recv",
718                                                 SdsAdd(sds, SDS{"size": len(reply)}),
719                                                 "queuing reply",
720                                         )
721                                         state.payloads <- reply
722                                 }
723                         }()
724                         if state.rxRate > 0 {
725                                 time.Sleep(time.Second / time.Duration(state.rxRate))
726                         }
727                 }
728         }()
729
730         return nil
731 }
732
733 func (state *SPState) Wait() {
734         state.wg.Wait()
735         state.dirUnlock()
736         state.Duration = time.Now().Sub(state.started)
737         state.RxSpeed = state.RxBytes
738         state.TxSpeed = state.TxBytes
739         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
740         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
741         if rxDuration > 0 {
742                 state.RxSpeed = state.RxBytes / rxDuration
743         }
744         if txDuration > 0 {
745                 state.TxSpeed = state.TxBytes / txDuration
746         }
747 }
748
749 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
750         sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
751         r := bytes.NewReader(payload)
752         var err error
753         var replies [][]byte
754         var infosGot bool
755         for r.Len() > 0 {
756                 state.Ctx.LogD("sp-process", sds, "unmarshaling header")
757                 var head SPHead
758                 if _, err = xdr.Unmarshal(r, &head); err != nil {
759                         state.Ctx.LogE("sp-process", sds, err, "")
760                         return nil, err
761                 }
762                 switch head.Type {
763                 case SPTypeInfo:
764                         infosGot = true
765                         sdsp := SdsAdd(sds, SDS{"type": "info"})
766                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
767                         var info SPInfo
768                         if _, err = xdr.Unmarshal(r, &info); err != nil {
769                                 state.Ctx.LogE("sp-process", sdsp, err, "")
770                                 return nil, err
771                         }
772                         sdsp = SdsAdd(sds, SDS{
773                                 "pkt":  ToBase32(info.Hash[:]),
774                                 "size": int64(info.Size),
775                                 "nice": int(info.Nice),
776                         })
777                         if !state.listOnly && info.Nice > state.Nice {
778                                 state.Ctx.LogD("sp-process", sdsp, "too nice")
779                                 continue
780                         }
781                         state.Ctx.LogD("sp-process", sdsp, "received")
782                         if !state.listOnly && state.xxOnly == TTx {
783                                 continue
784                         }
785                         state.Lock()
786                         state.infosTheir[*info.Hash] = &info
787                         state.Unlock()
788                         state.Ctx.LogD("sp-process", sdsp, "stating part")
789                         pktPath := filepath.Join(
790                                 state.Ctx.Spool,
791                                 state.Node.Id.String(),
792                                 string(TRx),
793                                 ToBase32(info.Hash[:]),
794                         )
795                         if _, err = os.Stat(pktPath); err == nil {
796                                 state.Ctx.LogI("sp-info", sdsp, "already done")
797                                 if !state.listOnly {
798                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
799                                 }
800                                 continue
801                         }
802                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
803                                 state.Ctx.LogI("sp-info", sdsp, "already seen")
804                                 if !state.listOnly {
805                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
806                                 }
807                                 continue
808                         }
809                         fi, err := os.Stat(pktPath + PartSuffix)
810                         var offset int64
811                         if err == nil {
812                                 offset = fi.Size()
813                         }
814                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
815                                 state.Ctx.LogI("sp-info", sdsp, "not enough space")
816                                 continue
817                         }
818                         state.Ctx.LogI(
819                                 "sp-info",
820                                 SdsAdd(sdsp, SDS{"offset": offset}),
821                                 "",
822                         )
823                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
824                                 replies = append(replies, MarshalSP(
825                                         SPTypeFreq,
826                                         SPFreq{info.Hash, uint64(offset)},
827                                 ))
828                         }
829                 case SPTypeFile:
830                         sdsp := SdsAdd(sds, SDS{"type": "file"})
831                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
832                         var file SPFile
833                         if _, err = xdr.Unmarshal(r, &file); err != nil {
834                                 state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "file"}), err, "")
835                                 return nil, err
836                         }
837                         sdsp["xx"] = string(TRx)
838                         sdsp["pkt"] = ToBase32(file.Hash[:])
839                         sdsp["size"] = len(file.Payload)
840                         dirToSync := filepath.Join(
841                                 state.Ctx.Spool,
842                                 state.Node.Id.String(),
843                                 string(TRx),
844                         )
845                         filePath := filepath.Join(dirToSync, ToBase32(file.Hash[:]))
846                         state.Ctx.LogD("sp-file", sdsp, "opening part")
847                         fd, err := os.OpenFile(
848                                 filePath+PartSuffix,
849                                 os.O_RDWR|os.O_CREATE,
850                                 os.FileMode(0666),
851                         )
852                         if err != nil {
853                                 state.Ctx.LogE("sp-file", sdsp, err, "")
854                                 return nil, err
855                         }
856                         state.Ctx.LogD(
857                                 "sp-file",
858                                 SdsAdd(sdsp, SDS{"offset": file.Offset}),
859                                 "seeking",
860                         )
861                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
862                                 state.Ctx.LogE("sp-file", sdsp, err, "")
863                                 fd.Close()
864                                 return nil, err
865                         }
866                         state.Ctx.LogD("sp-file", sdsp, "writing")
867                         _, err = fd.Write(file.Payload)
868                         if err != nil {
869                                 state.Ctx.LogE("sp-file", sdsp, err, "")
870                                 fd.Close()
871                                 return nil, err
872                         }
873                         ourSize := file.Offset + uint64(len(file.Payload))
874                         state.RLock()
875                         sdsp["size"] = int64(ourSize)
876                         sdsp["fullsize"] = int64(state.infosTheir[*file.Hash].Size)
877                         if state.Ctx.ShowPrgrs {
878                                 Progress(sdsp)
879                         }
880                         if state.infosTheir[*file.Hash].Size != ourSize {
881                                 state.RUnlock()
882                                 fd.Close()
883                                 continue
884                         }
885                         state.RUnlock()
886                         spWorkersGroup.Wait()
887                         spWorkersGroup.Add(1)
888                         go func() {
889                                 if err := fd.Sync(); err != nil {
890                                         state.Ctx.LogE("sp-file", sdsp, err, "sync")
891                                         fd.Close()
892                                         return
893                                 }
894                                 state.wg.Add(1)
895                                 defer state.wg.Done()
896                                 fd.Seek(0, io.SeekStart)
897                                 state.Ctx.LogD("sp-file", sdsp, "checking")
898                                 gut, err := Check(fd, file.Hash[:], sdsp, state.Ctx.ShowPrgrs)
899                                 fd.Close()
900                                 if err != nil || !gut {
901                                         state.Ctx.LogE("sp-file", sdsp, errors.New("checksum mismatch"), "")
902                                         return
903                                 }
904                                 state.Ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
905                                 if err = os.Rename(filePath+PartSuffix, filePath); err != nil {
906                                         state.Ctx.LogE("sp-file", sdsp, err, "rename")
907                                         return
908                                 }
909                                 if err = DirSync(dirToSync); err != nil {
910                                         state.Ctx.LogE("sp-file", sdsp, err, "sync")
911                                         return
912                                 }
913                                 state.Lock()
914                                 delete(state.infosTheir, *file.Hash)
915                                 state.Unlock()
916                                 spWorkersGroup.Done()
917                                 go func() {
918                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
919                                 }()
920                         }()
921                 case SPTypeDone:
922                         sdsp := SdsAdd(sds, SDS{"type": "done"})
923                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
924                         var done SPDone
925                         if _, err = xdr.Unmarshal(r, &done); err != nil {
926                                 state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "done"}), err, "")
927                                 return nil, err
928                         }
929                         sdsp["pkt"] = ToBase32(done.Hash[:])
930                         state.Ctx.LogD("sp-done", sdsp, "removing")
931                         err := os.Remove(filepath.Join(
932                                 state.Ctx.Spool,
933                                 state.Node.Id.String(),
934                                 string(TTx),
935                                 ToBase32(done.Hash[:]),
936                         ))
937                         sdsp["xx"] = string(TTx)
938                         if err == nil {
939                                 state.Ctx.LogI("sp-done", sdsp, "")
940                         } else {
941                                 state.Ctx.LogE("sp-done", sdsp, err, "")
942                         }
943                 case SPTypeFreq:
944                         sdsp := SdsAdd(sds, SDS{"type": "freq"})
945                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
946                         var freq SPFreq
947                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
948                                 state.Ctx.LogE("sp-process", sdsp, err, "")
949                                 return nil, err
950                         }
951                         sdsp["pkt"] = ToBase32(freq.Hash[:])
952                         sdsp["offset"] = freq.Offset
953                         state.Ctx.LogD("sp-process", sdsp, "queueing")
954                         nice, exists := state.infosOurSeen[*freq.Hash]
955                         if exists {
956                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
957                                         state.Lock()
958                                         insertIdx := 0
959                                         var freqWithNice *FreqWithNice
960                                         for insertIdx, freqWithNice = range state.queueTheir {
961                                                 if freqWithNice.nice > nice {
962                                                         break
963                                                 }
964                                         }
965                                         state.queueTheir = append(state.queueTheir, nil)
966                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
967                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
968                                         state.Unlock()
969                                 } else {
970                                         state.Ctx.LogD("sp-process", sdsp, "skipping")
971                                 }
972                         } else {
973                                 state.Ctx.LogD("sp-process", sdsp, "unknown")
974                         }
975                 case SPTypeHalt:
976                         state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "")
977                         state.Lock()
978                         state.queueTheir = nil
979                         state.Unlock()
980                 default:
981                         state.Ctx.LogE(
982                                 "sp-process",
983                                 SdsAdd(sds, SDS{"type": head.Type}),
984                                 errors.New("unknown type"),
985                                 "",
986                         )
987                         return nil, BadPktType
988                 }
989         }
990         if infosGot {
991                 var pkts int
992                 var size uint64
993                 state.RLock()
994                 for _, info := range state.infosTheir {
995                         pkts++
996                         size += info.Size
997                 }
998                 state.RUnlock()
999                 state.Ctx.LogI("sp-infos", SDS{
1000                         "xx":   string(TRx),
1001                         "node": state.Node.Id,
1002                         "pkts": pkts,
1003                         "size": int64(size),
1004                 }, "")
1005         }
1006         return payloadsSplit(replies), nil
1007 }