]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/sp.go
ee54f748bcd074e3782ad41671ef818ab95b79a1
[nncp.git] / src / cypherpunks.ru / nncp / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2018 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bytes"
23         "crypto/subtle"
24         "errors"
25         "io"
26         "net"
27         "os"
28         "path/filepath"
29         "sort"
30         "strconv"
31         "sync"
32         "time"
33
34         "github.com/davecgh/go-xdr/xdr2"
35         "github.com/flynn/noise"
36 )
37
38 const (
39         MaxSPSize       = 1<<16 - 256
40         PartSuffix      = ".part"
41         DefaultDeadline = 10
42 )
43
44 var (
45         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
46
47         SPHeadOverhead    int
48         SPInfoOverhead    int
49         SPFreqOverhead    int
50         SPFileOverhead    int
51         SPHaltMarshalized []byte
52
53         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
54                 noise.DH25519,
55                 noise.CipherChaChaPoly,
56                 noise.HashBLAKE2b,
57         )
58
59         spWorkersGroup sync.WaitGroup
60 )
61
62 type SPType uint8
63
64 const (
65         SPTypeInfo SPType = iota
66         SPTypeFreq SPType = iota
67         SPTypeFile SPType = iota
68         SPTypeDone SPType = iota
69         SPTypeHalt SPType = iota
70 )
71
72 type SPHead struct {
73         Type SPType
74 }
75
76 type SPInfo struct {
77         Nice uint8
78         Size uint64
79         Hash *[32]byte
80 }
81
82 type SPFreq struct {
83         Hash   *[32]byte
84         Offset uint64
85 }
86
87 type SPFile struct {
88         Hash    *[32]byte
89         Offset  uint64
90         Payload []byte
91 }
92
93 type SPDone struct {
94         Hash *[32]byte
95 }
96
97 type SPRaw struct {
98         Magic   [8]byte
99         Payload []byte
100 }
101
102 func init() {
103         var buf bytes.Buffer
104         spHead := SPHead{Type: SPTypeHalt}
105         if _, err := xdr.Marshal(&buf, spHead); err != nil {
106                 panic(err)
107         }
108         copy(SPHaltMarshalized, buf.Bytes())
109         SPHeadOverhead = buf.Len()
110         buf.Reset()
111
112         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
113         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
114                 panic(err)
115         }
116         SPInfoOverhead = buf.Len()
117         buf.Reset()
118
119         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
120         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
121                 panic(err)
122         }
123         SPFreqOverhead = buf.Len()
124         buf.Reset()
125
126         spFile := SPFile{Hash: new([32]byte), Offset: 123}
127         if _, err := xdr.Marshal(&buf, spFile); err != nil {
128                 panic(err)
129         }
130         SPFileOverhead = buf.Len()
131 }
132
133 func MarshalSP(typ SPType, sp interface{}) []byte {
134         var buf bytes.Buffer
135         var err error
136         if _, err = xdr.Marshal(&buf, SPHead{typ}); err != nil {
137                 panic(err)
138         }
139         if _, err = xdr.Marshal(&buf, sp); err != nil {
140                 panic(err)
141         }
142         return buf.Bytes()
143 }
144
145 func payloadsSplit(payloads [][]byte) [][]byte {
146         var outbounds [][]byte
147         outbound := make([]byte, 0, MaxSPSize)
148         for i, payload := range payloads {
149                 outbound = append(outbound, payload...)
150                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
151                         outbounds = append(outbounds, outbound)
152                         outbound = make([]byte, 0, MaxSPSize)
153                 }
154         }
155         if len(outbound) > 0 {
156                 outbounds = append(outbounds, outbound)
157         }
158         return outbounds
159 }
160
161 type SPState struct {
162         ctx            *Ctx
163         Node           *Node
164         onlineDeadline uint
165         maxOnlineTime  uint
166         nice           uint8
167         hs             *noise.HandshakeState
168         csOur          *noise.CipherState
169         csTheir        *noise.CipherState
170         payloads       chan []byte
171         infosTheir     map[[32]byte]*SPInfo
172         infosOurSeen   map[[32]byte]struct{}
173         queueTheir     []*SPFreq
174         wg             sync.WaitGroup
175         RxBytes        int64
176         RxLastSeen     time.Time
177         TxBytes        int64
178         TxLastSeen     time.Time
179         started        time.Time
180         Duration       time.Duration
181         RxSpeed        int64
182         TxSpeed        int64
183         rxLock         *os.File
184         txLock         *os.File
185         xxOnly         TRxTx
186         isDead         bool
187         sync.RWMutex
188 }
189
190 func (state *SPState) NotAlive() bool {
191         if state.isDead {
192                 return true
193         }
194         now := time.Now()
195         if state.maxOnlineTime > 0 && state.started.Add(time.Duration(state.maxOnlineTime)*time.Second).Before(now) {
196                 return true
197         }
198         return uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline && uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline
199 }
200
201 func (state *SPState) dirUnlock() {
202         state.ctx.UnlockDir(state.rxLock)
203         state.ctx.UnlockDir(state.txLock)
204 }
205
206 func (state *SPState) WriteSP(dst io.Writer, payload []byte) error {
207         n, err := xdr.Marshal(dst, SPRaw{Magic: MagicNNCPLv1, Payload: payload})
208         if err == nil {
209                 state.TxLastSeen = time.Now()
210                 state.TxBytes += int64(n)
211         }
212         return err
213 }
214
215 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
216         var sp SPRaw
217         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
218         if err != nil {
219                 return nil, err
220         }
221         state.RxLastSeen = time.Now()
222         state.RxBytes += int64(n)
223         if sp.Magic != MagicNNCPLv1 {
224                 return nil, BadMagic
225         }
226         return sp.Payload, nil
227 }
228
229 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{}) [][]byte {
230         var infos []*SPInfo
231         var totalSize int64
232         for job := range ctx.Jobs(nodeId, TTx) {
233                 job.Fd.Close()
234                 if job.PktEnc.Nice > nice {
235                         continue
236                 }
237                 if _, known := (*seen)[*job.HshValue]; known {
238                         continue
239                 }
240                 totalSize += job.Size
241                 infos = append(infos, &SPInfo{
242                         Nice: job.PktEnc.Nice,
243                         Size: uint64(job.Size),
244                         Hash: job.HshValue,
245                 })
246                 (*seen)[*job.HshValue] = struct{}{}
247         }
248         sort.Sort(ByNice(infos))
249         var payloads [][]byte
250         for _, info := range infos {
251                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
252                 ctx.LogD("sp-info-our", SDS{
253                         "node": nodeId,
254                         "name": ToBase32(info.Hash[:]),
255                         "size": strconv.FormatInt(int64(info.Size), 10),
256                 }, "")
257         }
258         if totalSize > 0 {
259                 ctx.LogI("sp-infos", SDS{
260                         "xx":   string(TTx),
261                         "node": nodeId,
262                         "pkts": strconv.Itoa(len(payloads)),
263                         "size": strconv.FormatInt(totalSize, 10),
264                 }, "")
265         }
266         return payloadsSplit(payloads)
267 }
268
269 func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly TRxTx, onlineDeadline, maxOnlineTime uint) (*SPState, error) {
270         err := ctx.ensureRxDir(nodeId)
271         if err != nil {
272                 return nil, err
273         }
274         var rxLock *os.File
275         if xxOnly == "" || xxOnly == TRx {
276                 rxLock, err = ctx.LockDir(nodeId, TRx)
277                 if err != nil {
278                         return nil, err
279                 }
280         }
281         var txLock *os.File
282         if xxOnly == "" || xxOnly == TTx {
283                 txLock, err = ctx.LockDir(nodeId, TTx)
284                 if err != nil {
285                         return nil, err
286                 }
287         }
288         started := time.Now()
289         node := ctx.Neigh[*nodeId]
290         conf := noise.Config{
291                 CipherSuite: NoiseCipherSuite,
292                 Pattern:     noise.HandshakeIK,
293                 Initiator:   true,
294                 StaticKeypair: noise.DHKey{
295                         Private: ctx.Self.NoisePrv[:],
296                         Public:  ctx.Self.NoisePub[:],
297                 },
298                 PeerStatic: node.NoisePub[:],
299         }
300         state := SPState{
301                 ctx:            ctx,
302                 hs:             noise.NewHandshakeState(conf),
303                 Node:           node,
304                 onlineDeadline: onlineDeadline,
305                 maxOnlineTime:  maxOnlineTime,
306                 nice:           nice,
307                 payloads:       make(chan []byte),
308                 infosTheir:     make(map[[32]byte]*SPInfo),
309                 infosOurSeen:   make(map[[32]byte]struct{}),
310                 started:        started,
311                 rxLock:         rxLock,
312                 txLock:         txLock,
313                 xxOnly:         xxOnly,
314         }
315
316         var infosPayloads [][]byte
317         if xxOnly == "" || xxOnly == TTx {
318                 infosPayloads = ctx.infosOur(nodeId, nice, &state.infosOurSeen)
319         }
320         var firstPayload []byte
321         if len(infosPayloads) > 0 {
322                 firstPayload = infosPayloads[0]
323         }
324         // Pad first payload, to hide actual number of existing files
325         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
326                 firstPayload = append(firstPayload, SPHaltMarshalized...)
327         }
328
329         var buf []byte
330         var payload []byte
331         buf, _, _ = state.hs.WriteMessage(nil, firstPayload)
332         sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
333         ctx.LogD("sp-start", sds, "sending first message")
334         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
335         if err = state.WriteSP(conn, buf); err != nil {
336                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
337                 state.dirUnlock()
338                 return nil, err
339         }
340         ctx.LogD("sp-start", sds, "waiting for first message")
341         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
342         if buf, err = state.ReadSP(conn); err != nil {
343                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
344                 state.dirUnlock()
345                 return nil, err
346         }
347         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
348         if err != nil {
349                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
350                 state.dirUnlock()
351                 return nil, err
352         }
353         ctx.LogD("sp-start", sds, "starting workers")
354         err = state.StartWorkers(conn, infosPayloads, payload)
355         if err != nil {
356                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
357                 state.dirUnlock()
358                 return nil, err
359         }
360         return &state, err
361 }
362
363 func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly TRxTx) (*SPState, error) {
364         started := time.Now()
365         conf := noise.Config{
366                 CipherSuite: NoiseCipherSuite,
367                 Pattern:     noise.HandshakeIK,
368                 Initiator:   false,
369                 StaticKeypair: noise.DHKey{
370                         Private: ctx.Self.NoisePrv[:],
371                         Public:  ctx.Self.NoisePub[:],
372                 },
373         }
374         state := SPState{
375                 ctx:          ctx,
376                 hs:           noise.NewHandshakeState(conf),
377                 nice:         nice,
378                 payloads:     make(chan []byte),
379                 infosOurSeen: make(map[[32]byte]struct{}),
380                 infosTheir:   make(map[[32]byte]*SPInfo),
381                 started:      started,
382                 xxOnly:       xxOnly,
383         }
384         var buf []byte
385         var payload []byte
386         var err error
387         ctx.LogD(
388                 "sp-start",
389                 SDS{"nice": strconv.Itoa(int(nice))},
390                 "waiting for first message",
391         )
392         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
393         if buf, err = state.ReadSP(conn); err != nil {
394                 ctx.LogE("sp-start", SDS{"err": err}, "")
395                 return nil, err
396         }
397         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
398                 ctx.LogE("sp-start", SDS{"err": err}, "")
399                 return nil, err
400         }
401
402         var node *Node
403         for _, node = range ctx.Neigh {
404                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), node.NoisePub[:]) == 1 {
405                         break
406                 }
407         }
408         if node == nil {
409                 peerId := ToBase32(state.hs.PeerStatic())
410                 ctx.LogE("sp-start", SDS{"peer": peerId}, "unknown")
411                 return nil, errors.New("Unknown peer: " + peerId)
412         }
413         state.Node = node
414         state.onlineDeadline = node.OnlineDeadline
415         state.maxOnlineTime = node.MaxOnlineTime
416         sds := SDS{"node": node.Id, "nice": strconv.Itoa(int(nice))}
417
418         if ctx.ensureRxDir(node.Id); err != nil {
419                 return nil, err
420         }
421         var rxLock *os.File
422         if xxOnly == "" || xxOnly == TRx {
423                 rxLock, err = ctx.LockDir(node.Id, TRx)
424                 if err != nil {
425                         return nil, err
426                 }
427         }
428         state.rxLock = rxLock
429         var txLock *os.File
430         if xxOnly == "" || xxOnly == TTx {
431                 txLock, err = ctx.LockDir(node.Id, TTx)
432                 if err != nil {
433                         return nil, err
434                 }
435         }
436         state.txLock = txLock
437
438         var infosPayloads [][]byte
439         if xxOnly == "" || xxOnly == TTx {
440                 infosPayloads = ctx.infosOur(node.Id, nice, &state.infosOurSeen)
441         }
442         var firstPayload []byte
443         if len(infosPayloads) > 0 {
444                 firstPayload = infosPayloads[0]
445         }
446         // Pad first payload, to hide actual number of existing files
447         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
448                 firstPayload = append(firstPayload, SPHaltMarshalized...)
449         }
450
451         ctx.LogD("sp-start", sds, "sending first message")
452         buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload)
453         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
454         if err = state.WriteSP(conn, buf); err != nil {
455                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
456                 state.dirUnlock()
457                 return nil, err
458         }
459         ctx.LogD("sp-start", sds, "starting workers")
460         err = state.StartWorkers(conn, infosPayloads, payload)
461         if err != nil {
462                 state.dirUnlock()
463                 return nil, err
464         }
465         return &state, err
466 }
467
468 func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payload []byte) error {
469         sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.nice))}
470         if len(infosPayloads) > 1 {
471                 go func() {
472                         for _, payload := range infosPayloads[1:] {
473                                 state.ctx.LogD(
474                                         "sp-work",
475                                         SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
476                                         "queuing remaining payload",
477                                 )
478                                 state.payloads <- payload
479                         }
480                 }()
481         }
482         state.ctx.LogD(
483                 "sp-work",
484                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
485                 "processing first payload",
486         )
487         replies, err := state.ProcessSP(payload)
488         if err != nil {
489                 state.ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "")
490                 return err
491         }
492
493         go func() {
494                 for _, reply := range replies {
495                         state.ctx.LogD(
496                                 "sp-work",
497                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
498                                 "queuing reply",
499                         )
500                         state.payloads <- reply
501                 }
502         }()
503
504         if state.xxOnly == "" || state.xxOnly == TTx {
505                 go func() {
506                         for range time.Tick(time.Second) {
507                                 for _, payload := range state.ctx.infosOur(
508                                         state.Node.Id,
509                                         state.nice,
510                                         &state.infosOurSeen,
511                                 ) {
512                                         state.ctx.LogD(
513                                                 "sp-work",
514                                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
515                                                 "queuing new info",
516                                         )
517                                         state.payloads <- payload
518                                 }
519                         }
520                 }()
521         }
522
523         state.wg.Add(1)
524         go func() {
525                 defer func() {
526                         state.isDead = true
527                         state.wg.Done()
528                 }()
529                 for {
530                         if state.NotAlive() {
531                                 return
532                         }
533                         var payload []byte
534                         select {
535                         case payload = <-state.payloads:
536                                 state.ctx.LogD(
537                                         "sp-xmit",
538                                         SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
539                                         "got payload",
540                                 )
541                         default:
542                         }
543                         if payload == nil {
544                                 state.RLock()
545                                 if len(state.queueTheir) == 0 {
546                                         state.ctx.LogD("sp-xmit", sds, "file queue is empty")
547                                         state.RUnlock()
548                                         time.Sleep(100 * time.Millisecond)
549                                         continue
550                                 }
551                                 freq := state.queueTheir[0]
552                                 state.RUnlock()
553                                 sdsp := SdsAdd(sds, SDS{
554                                         "xx":   string(TTx),
555                                         "hash": ToBase32(freq.Hash[:]),
556                                         "size": strconv.FormatInt(int64(freq.Offset), 10),
557                                 })
558                                 state.ctx.LogD("sp-file", sdsp, "queueing")
559                                 fd, err := os.Open(filepath.Join(
560                                         state.ctx.Spool,
561                                         state.Node.Id.String(),
562                                         string(TTx),
563                                         ToBase32(freq.Hash[:]),
564                                 ))
565                                 if err != nil {
566                                         state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
567                                         break
568                                 }
569                                 fi, err := fd.Stat()
570                                 if err != nil {
571                                         state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
572                                         break
573                                 }
574                                 fullSize := uint64(fi.Size())
575                                 var buf []byte
576                                 if freq.Offset < fullSize {
577                                         state.ctx.LogD("sp-file", sdsp, "seeking")
578                                         if _, err = fd.Seek(int64(freq.Offset), 0); err != nil {
579                                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
580                                                 break
581                                         }
582                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
583                                         n, err := fd.Read(buf)
584                                         if err != nil {
585                                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
586                                                 break
587                                         }
588                                         buf = buf[:n]
589                                         state.ctx.LogD(
590                                                 "sp-file",
591                                                 SdsAdd(sdsp, SDS{"size": strconv.Itoa(n)}),
592                                                 "read",
593                                         )
594                                 }
595                                 fd.Close()
596                                 payload = MarshalSP(SPTypeFile, SPFile{
597                                         Hash:    freq.Hash,
598                                         Offset:  freq.Offset,
599                                         Payload: buf,
600                                 })
601                                 ourSize := freq.Offset + uint64(len(buf))
602                                 sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
603                                 sdsp["fullsize"] = strconv.FormatInt(int64(fullSize), 10)
604                                 state.ctx.LogP("sp-file", sdsp, "")
605                                 state.Lock()
606                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].Hash == *freq.Hash {
607                                         if ourSize == fullSize {
608                                                 state.ctx.LogD("sp-file", sdsp, "finished")
609                                                 if len(state.queueTheir) > 1 {
610                                                         state.queueTheir = state.queueTheir[1:]
611                                                 } else {
612                                                         state.queueTheir = state.queueTheir[:0]
613                                                 }
614                                         } else {
615                                                 state.queueTheir[0].Offset += uint64(len(buf))
616                                         }
617                                 } else {
618                                         state.ctx.LogD("sp-file", sdsp, "queue disappeared")
619                                 }
620                                 state.Unlock()
621                         }
622                         state.ctx.LogD(
623                                 "sp-xmit",
624                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
625                                 "sending",
626                         )
627                         conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
628                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil {
629                                 state.ctx.LogE("sp-xmit", SdsAdd(sds, SDS{"err": err}), "")
630                                 break
631                         }
632                 }
633         }()
634
635         state.wg.Add(1)
636         go func() {
637                 defer func() {
638                         state.isDead = true
639                         state.wg.Done()
640                 }()
641                 for {
642                         if state.NotAlive() {
643                                 return
644                         }
645                         state.ctx.LogD("sp-recv", sds, "waiting for payload")
646                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
647                         payload, err := state.ReadSP(conn)
648                         if err != nil {
649                                 unmarshalErr := err.(*xdr.UnmarshalError)
650                                 netErr, ok := unmarshalErr.Err.(net.Error)
651                                 if ok && netErr.Timeout() {
652                                         continue
653                                 }
654                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
655                                         break
656                                 }
657                                 state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
658                                 break
659                         }
660                         state.ctx.LogD(
661                                 "sp-recv",
662                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
663                                 "got payload",
664                         )
665                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
666                         if err != nil {
667                                 state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
668                                 break
669                         }
670                         state.ctx.LogD(
671                                 "sp-recv",
672                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
673                                 "processing",
674                         )
675                         replies, err := state.ProcessSP(payload)
676                         if err != nil {
677                                 state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
678                                 break
679                         }
680                         go func() {
681                                 for _, reply := range replies {
682                                         state.ctx.LogD(
683                                                 "sp-recv",
684                                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
685                                                 "queuing reply",
686                                         )
687                                         state.payloads <- reply
688                                 }
689                         }()
690                 }
691         }()
692
693         return nil
694 }
695
696 func (state *SPState) Wait() {
697         state.wg.Wait()
698         state.dirUnlock()
699         state.Duration = time.Now().Sub(state.started)
700         state.RxSpeed = state.RxBytes
701         state.TxSpeed = state.TxBytes
702         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
703         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
704         if rxDuration > 0 {
705                 state.RxSpeed = state.RxBytes / rxDuration
706         }
707         if txDuration > 0 {
708                 state.TxSpeed = state.TxBytes / txDuration
709         }
710 }
711
712 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
713         sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.nice))}
714         r := bytes.NewReader(payload)
715         var err error
716         var replies [][]byte
717         var infosGot bool
718         for r.Len() > 0 {
719                 state.ctx.LogD("sp-process", sds, "unmarshaling header")
720                 var head SPHead
721                 if _, err = xdr.Unmarshal(r, &head); err != nil {
722                         state.ctx.LogE("sp-process", SdsAdd(sds, SDS{"err": err}), "")
723                         return nil, err
724                 }
725                 switch head.Type {
726                 case SPTypeInfo:
727                         infosGot = true
728                         sdsp := SdsAdd(sds, SDS{"type": "info"})
729                         state.ctx.LogD("sp-process", sdsp, "unmarshaling packet")
730                         var info SPInfo
731                         if _, err = xdr.Unmarshal(r, &info); err != nil {
732                                 state.ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "")
733                                 return nil, err
734                         }
735                         sdsp = SdsAdd(sds, SDS{
736                                 "hash": ToBase32(info.Hash[:]),
737                                 "size": strconv.FormatInt(int64(info.Size), 10),
738                         })
739                         if info.Nice > state.nice {
740                                 state.ctx.LogD("sp-process", sdsp, "too nice")
741                                 continue
742                         }
743                         state.ctx.LogD("sp-process", sdsp, "received")
744                         if state.xxOnly == TTx {
745                                 continue
746                         }
747                         state.Lock()
748                         state.infosTheir[*info.Hash] = &info
749                         state.Unlock()
750                         state.ctx.LogD("sp-process", sdsp, "stating part")
751                         if _, err = os.Stat(filepath.Join(
752                                 state.ctx.Spool,
753                                 state.Node.Id.String(),
754                                 string(TRx),
755                                 ToBase32(info.Hash[:]),
756                         )); err == nil {
757                                 state.ctx.LogD("sp-process", sdsp, "already done")
758                                 replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
759                                 continue
760                         }
761                         fi, err := os.Stat(filepath.Join(
762                                 state.ctx.Spool,
763                                 state.Node.Id.String(),
764                                 string(TRx),
765                                 ToBase32(info.Hash[:])+PartSuffix,
766                         ))
767                         var offset int64
768                         if err == nil {
769                                 offset = fi.Size()
770                                 state.ctx.LogD(
771                                         "sp-process",
772                                         SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(offset, 10)}),
773                                         "part exists",
774                                 )
775                         }
776                         replies = append(replies, MarshalSP(
777                                 SPTypeFreq,
778                                 SPFreq{info.Hash, uint64(offset)},
779                         ))
780                 case SPTypeFile:
781                         state.ctx.LogD(
782                                 "sp-process",
783                                 SdsAdd(sds, SDS{"type": "file"}),
784                                 "unmarshaling packet",
785                         )
786                         var file SPFile
787                         if _, err = xdr.Unmarshal(r, &file); err != nil {
788                                 state.ctx.LogE("sp-process", SdsAdd(sds, SDS{
789                                         "err":  err,
790                                         "type": "file",
791                                 }), "")
792                                 return nil, err
793                         }
794                         sdsp := SdsAdd(sds, SDS{
795                                 "xx":   string(TRx),
796                                 "hash": ToBase32(file.Hash[:]),
797                                 "size": strconv.Itoa(len(file.Payload)),
798                         })
799                         filePath := filepath.Join(
800                                 state.ctx.Spool,
801                                 state.Node.Id.String(),
802                                 string(TRx),
803                                 ToBase32(file.Hash[:]),
804                         )
805                         state.ctx.LogD("sp-file", sdsp, "opening part")
806                         fd, err := os.OpenFile(
807                                 filePath+PartSuffix,
808                                 os.O_RDWR|os.O_CREATE,
809                                 os.FileMode(0600),
810                         )
811                         if err != nil {
812                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
813                                 return nil, err
814                         }
815                         state.ctx.LogD(
816                                 "sp-file",
817                                 SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(int64(file.Offset), 10)}),
818                                 "seeking",
819                         )
820                         if _, err = fd.Seek(int64(file.Offset), 0); err != nil {
821                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
822                                 fd.Close()
823                                 return nil, err
824                         }
825                         state.ctx.LogD("sp-file", sdsp, "writing")
826                         _, err = fd.Write(file.Payload)
827                         if err != nil {
828                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
829                                 fd.Close()
830                                 return nil, err
831                         }
832                         ourSize := uint64(file.Offset) + uint64(len(file.Payload))
833                         state.RLock()
834                         sdsp["fullsize"] = strconv.FormatInt(int64(state.infosTheir[*file.Hash].Size), 10)
835                         sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
836                         state.ctx.LogP("sp-file", sdsp, "")
837                         if state.infosTheir[*file.Hash].Size != ourSize {
838                                 state.RUnlock()
839                                 fd.Close()
840                                 continue
841                         }
842                         state.RUnlock()
843                         spWorkersGroup.Wait()
844                         spWorkersGroup.Add(1)
845                         go func() {
846                                 if err := fd.Sync(); err != nil {
847                                         state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync")
848                                         fd.Close()
849                                         return
850                                 }
851                                 state.wg.Add(1)
852                                 defer state.wg.Done()
853                                 fd.Seek(0, 0)
854                                 state.ctx.LogD("sp-file", sdsp, "checking")
855                                 gut, err := Check(fd, file.Hash[:])
856                                 fd.Close()
857                                 if err != nil || !gut {
858                                         state.ctx.LogE("sp-file", sdsp, "checksum mismatch")
859                                         return
860                                 }
861                                 state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
862                                 os.Rename(filePath+PartSuffix, filePath)
863                                 state.Lock()
864                                 delete(state.infosTheir, *file.Hash)
865                                 state.Unlock()
866                                 spWorkersGroup.Done()
867                                 go func() {
868                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
869                                 }()
870                         }()
871                 case SPTypeDone:
872                         state.ctx.LogD(
873                                 "sp-process",
874                                 SdsAdd(sds, SDS{"type": "done"}),
875                                 "unmarshaling packet",
876                         )
877                         var done SPDone
878                         if _, err = xdr.Unmarshal(r, &done); err != nil {
879                                 state.ctx.LogE("sp-process", SdsAdd(sds, SDS{
880                                         "type": "done",
881                                         "err":  err,
882                                 }), "")
883                                 return nil, err
884                         }
885                         sdsp := SdsAdd(sds, SDS{"hash": ToBase32(done.Hash[:])})
886                         state.ctx.LogD("sp-done", sdsp, "removing")
887                         err := os.Remove(filepath.Join(
888                                 state.ctx.Spool,
889                                 state.Node.Id.String(),
890                                 string(TTx),
891                                 ToBase32(done.Hash[:]),
892                         ))
893                         if err == nil {
894                                 state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "")
895                         } else {
896                                 state.ctx.LogE("sp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "")
897                         }
898                 case SPTypeFreq:
899                         sdsp := SdsAdd(sds, SDS{"type": "freq"})
900                         state.ctx.LogD("sp-process", sdsp, "unmarshaling packet")
901                         var freq SPFreq
902                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
903                                 state.ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "")
904                                 return nil, err
905                         }
906                         state.ctx.LogD("sp-process", SdsAdd(sdsp, SDS{
907                                 "hash":   ToBase32(freq.Hash[:]),
908                                 "offset": strconv.FormatInt(int64(freq.Offset), 10),
909                         }), "queueing")
910                         state.Lock()
911                         state.queueTheir = append(state.queueTheir, &freq)
912                         state.Unlock()
913                 case SPTypeHalt:
914                         sdsp := SdsAdd(sds, SDS{"type": "halt"})
915                         state.ctx.LogD("sp-process", sdsp, "")
916                         state.Lock()
917                         state.queueTheir = nil
918                         state.Unlock()
919                 default:
920                         state.ctx.LogE(
921                                 "sp-process",
922                                 SdsAdd(sds, SDS{"type": head.Type}),
923                                 "unknown",
924                         )
925                         return nil, BadPktType
926                 }
927         }
928         if infosGot {
929                 var pkts int
930                 var size uint64
931                 state.RLock()
932                 for _, info := range state.infosTheir {
933                         pkts++
934                         size += info.Size
935                 }
936                 state.RUnlock()
937                 state.ctx.LogI("sp-infos", SDS{
938                         "xx":   string(TRx),
939                         "node": state.Node.Id,
940                         "pkts": strconv.Itoa(pkts),
941                         "size": strconv.FormatInt(int64(size), 10),
942                 }, "")
943         }
944         return payloadsSplit(replies), nil
945 }