]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/llp.go
Initial
[nncp.git] / src / cypherpunks.ru / nncp / llp.go
1 /*
2 NNCP -- Node-to-Node CoPy
3 Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bytes"
23         "crypto/subtle"
24         "errors"
25         "io"
26         "net"
27         "os"
28         "path/filepath"
29         "sort"
30         "strconv"
31         "sync"
32         "time"
33
34         "github.com/davecgh/go-xdr/xdr2"
35         "github.com/flynn/noise"
36 )
37
38 const (
39         MaxLLPSize = 2<<15 - 256
40         PartSuffix = ".part"
41 )
42
43 var (
44         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'L', 1, 0, 0}
45
46         LLPHeadOverhead    int
47         LLPInfoOverhead    int
48         LLPFreqOverhead    int
49         LLPFileOverhead    int
50         LLPHaltMarshalized []byte
51
52         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
53                 noise.DH25519,
54                 noise.CipherChaChaPoly,
55                 noise.HashBLAKE2b,
56         )
57
58         DeadlineDuration time.Duration = 10 * time.Second
59 )
60
61 type LLPType uint8
62
63 const (
64         LLPTypeInfo LLPType = iota
65         LLPTypeFreq LLPType = iota
66         LLPTypeFile LLPType = iota
67         LLPTypeDone LLPType = iota
68         LLPTypeHalt LLPType = iota
69 )
70
71 type LLPHead struct {
72         Type LLPType
73 }
74
75 type LLPInfo struct {
76         Nice uint8
77         Size uint64
78         Hash *[32]byte
79 }
80
81 type LLPFreq struct {
82         Hash   *[32]byte
83         Offset uint64
84 }
85
86 type LLPFile struct {
87         Hash    *[32]byte
88         Offset  uint64
89         Payload []byte
90 }
91
92 type LLPDone struct {
93         Hash *[32]byte
94 }
95
96 type LLPRaw struct {
97         Magic   [8]byte
98         Payload []byte
99 }
100
101 func init() {
102         var buf bytes.Buffer
103         llpHead := LLPHead{Type: LLPTypeHalt}
104         if _, err := xdr.Marshal(&buf, llpHead); err != nil {
105                 panic(err)
106         }
107         copy(LLPHaltMarshalized, buf.Bytes())
108         LLPHeadOverhead = buf.Len()
109         buf.Reset()
110
111         llpInfo := LLPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
112         if _, err := xdr.Marshal(&buf, llpInfo); err != nil {
113                 panic(err)
114         }
115         LLPInfoOverhead = buf.Len()
116         buf.Reset()
117
118         llpFreq := LLPFreq{Hash: new([32]byte), Offset: 123}
119         if _, err := xdr.Marshal(&buf, llpFreq); err != nil {
120                 panic(err)
121         }
122         LLPFreqOverhead = buf.Len()
123         buf.Reset()
124
125         llpFile := LLPFile{Hash: new([32]byte), Offset: 123}
126         if _, err := xdr.Marshal(&buf, llpFile); err != nil {
127                 panic(err)
128         }
129         LLPFileOverhead = buf.Len()
130 }
131
132 func MarshalLLP(typ LLPType, llp interface{}) []byte {
133         var buf bytes.Buffer
134         var err error
135         if _, err = xdr.Marshal(&buf, LLPHead{typ}); err != nil {
136                 panic(err)
137         }
138         if _, err = xdr.Marshal(&buf, llp); err != nil {
139                 panic(err)
140         }
141         return buf.Bytes()
142 }
143
144 func payloadsSplit(payloads [][]byte) [][]byte {
145         var outbounds [][]byte
146         outbound := make([]byte, 0, MaxLLPSize)
147         for i, payload := range payloads {
148                 outbound = append(outbound, payload...)
149                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxLLPSize {
150                         outbounds = append(outbounds, outbound)
151                         outbound = make([]byte, 0, MaxLLPSize)
152                 }
153         }
154         if len(outbound) > 0 {
155                 outbounds = append(outbounds, outbound)
156         }
157         return outbounds
158 }
159
160 type LLPState struct {
161         ctx        *Ctx
162         NodeId     *NodeId
163         nice       uint8
164         hs         *noise.HandshakeState
165         csOur      *noise.CipherState
166         csTheir    *noise.CipherState
167         payloads   chan []byte
168         infosTheir map[[32]byte]*LLPInfo
169         queueTheir []*LLPFreq
170         isDead     bool
171         wg         sync.WaitGroup
172         RxBytes    int64
173         RxLastSeen time.Time
174         TxBytes    int64
175         TxLastSeen time.Time
176         started    time.Time
177         Duration   time.Duration
178         RxSpeed    int64
179         TxSpeed    int64
180         rxLock     *os.File
181         txLock     *os.File
182         xxOnly     *TRxTx
183         sync.RWMutex
184 }
185
186 func (state *LLPState) dirUnlock() {
187         state.ctx.UnlockDir(state.rxLock)
188         state.ctx.UnlockDir(state.txLock)
189 }
190
191 func (state *LLPState) WriteLLP(dst io.Writer, payload []byte) error {
192         n, err := xdr.Marshal(dst, LLPRaw{Magic: MagicNNCPLv1, Payload: payload})
193         if err == nil {
194                 state.TxLastSeen = time.Now()
195                 state.TxBytes += int64(n)
196         }
197         return err
198 }
199
200 func (state *LLPState) ReadLLP(src io.Reader) ([]byte, error) {
201         var llp LLPRaw
202         n, err := xdr.Unmarshal(src, &llp)
203         if err != nil {
204                 return nil, err
205         }
206         state.RxLastSeen = time.Now()
207         state.RxBytes += int64(n)
208         if llp.Magic != MagicNNCPLv1 {
209                 return nil, BadMagic
210         }
211         return llp.Payload, nil
212 }
213
214 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8) [][]byte {
215         var infos []*LLPInfo
216         var totalSize int64
217         for job := range ctx.Jobs(nodeId, TTx) {
218                 job.Fd.Close()
219                 if job.PktEnc.Nice > nice {
220                         continue
221                 }
222                 totalSize += job.Size
223                 infos = append(infos, &LLPInfo{
224                         Nice: job.PktEnc.Nice,
225                         Size: uint64(job.Size),
226                         Hash: job.HshValue,
227                 })
228         }
229         sort.Sort(ByNice(infos))
230         var payloads [][]byte
231         for _, info := range infos {
232                 payloads = append(payloads, MarshalLLP(LLPTypeInfo, info))
233                 ctx.LogD("llp-info-our", SDS{
234                         "node": nodeId,
235                         "name": ToBase32(info.Hash[:]),
236                         "size": strconv.FormatInt(int64(info.Size), 10),
237                 }, "")
238         }
239         ctx.LogI("llp-infos", SDS{
240                 "xx":   string(TTx),
241                 "node": nodeId,
242                 "pkts": strconv.Itoa(len(payloads)),
243                 "size": strconv.FormatInt(totalSize, 10),
244         }, "")
245         return payloadsSplit(payloads)
246 }
247
248 func (ctx *Ctx) ensureRxDir(nodeId *NodeId) error {
249         dirPath := filepath.Join(ctx.Spool, nodeId.String(), string(TRx))
250         if err := os.MkdirAll(dirPath, os.FileMode(0700)); err != nil {
251                 ctx.LogE("llp-ensure", SDS{"dir": dirPath, "err": err}, "")
252                 return err
253         }
254         fd, err := os.Open(dirPath)
255         if err != nil {
256                 ctx.LogE("llp-ensure", SDS{"dir": dirPath, "err": err}, "")
257                 return err
258         }
259         fd.Close()
260         return nil
261 }
262
263 func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) (*LLPState, error) {
264         err := ctx.ensureRxDir(nodeId)
265         if err != nil {
266                 return nil, err
267         }
268         var rxLock *os.File
269         if xxOnly != nil && *xxOnly == TRx {
270                 rxLock, err = ctx.LockDir(nodeId, TRx)
271                 if err != nil {
272                         return nil, err
273                 }
274         }
275         var txLock *os.File
276         if xxOnly != nil && *xxOnly == TTx {
277                 txLock, err = ctx.LockDir(nodeId, TTx)
278                 if err != nil {
279                         return nil, err
280                 }
281         }
282         started := time.Now()
283         conf := noise.Config{
284                 CipherSuite: NoiseCipherSuite,
285                 Pattern:     noise.HandshakeIK,
286                 Initiator:   true,
287                 StaticKeypair: noise.DHKey{
288                         Private: ctx.Self.NoisePrv[:],
289                         Public:  ctx.Self.NoisePub[:],
290                 },
291                 PeerStatic: ctx.Neigh[*nodeId].NoisePub[:],
292         }
293         state := LLPState{
294                 ctx:        ctx,
295                 hs:         noise.NewHandshakeState(conf),
296                 NodeId:     nodeId,
297                 nice:       nice,
298                 payloads:   make(chan []byte),
299                 infosTheir: make(map[[32]byte]*LLPInfo),
300                 started:    started,
301                 rxLock:     rxLock,
302                 txLock:     txLock,
303                 xxOnly:     xxOnly,
304         }
305
306         var infosPayloads [][]byte
307         if xxOnly == nil || *xxOnly != TTx {
308                 infosPayloads = ctx.infosOur(nodeId, nice)
309         }
310         var firstPayload []byte
311         if len(infosPayloads) > 0 {
312                 firstPayload = infosPayloads[0]
313         }
314         for i := 0; i < (MaxLLPSize-len(firstPayload))/LLPHeadOverhead; i++ {
315                 firstPayload = append(firstPayload, LLPHaltMarshalized...)
316         }
317
318         var buf []byte
319         var payload []byte
320         buf, _, _ = state.hs.WriteMessage(nil, firstPayload)
321         sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
322         ctx.LogD("llp-start", sds, "sending first message")
323         conn.SetWriteDeadline(time.Now().Add(DeadlineDuration))
324         if err = state.WriteLLP(conn, buf); err != nil {
325                 ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
326                 state.dirUnlock()
327                 return nil, err
328         }
329         ctx.LogD("llp-start", sds, "waiting for first message")
330         conn.SetReadDeadline(time.Now().Add(DeadlineDuration))
331         if buf, err = state.ReadLLP(conn); err != nil {
332                 ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
333                 state.dirUnlock()
334                 return nil, err
335         }
336         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
337         if err != nil {
338                 ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
339                 state.dirUnlock()
340                 return nil, err
341         }
342         ctx.LogD("llp-start", sds, "starting workers")
343         err = state.StartWorkers(conn, infosPayloads, payload)
344         if err != nil {
345                 ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
346                 state.dirUnlock()
347                 return nil, err
348         }
349         return &state, err
350 }
351
352 func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*LLPState, error) {
353         started := time.Now()
354         conf := noise.Config{
355                 CipherSuite: NoiseCipherSuite,
356                 Pattern:     noise.HandshakeIK,
357                 Initiator:   false,
358                 StaticKeypair: noise.DHKey{
359                         Private: ctx.Self.NoisePrv[:],
360                         Public:  ctx.Self.NoisePub[:],
361                 },
362         }
363         state := LLPState{
364                 ctx:        ctx,
365                 hs:         noise.NewHandshakeState(conf),
366                 nice:       nice,
367                 payloads:   make(chan []byte),
368                 infosTheir: make(map[[32]byte]*LLPInfo),
369                 started:    started,
370                 xxOnly:     xxOnly,
371         }
372         var buf []byte
373         var payload []byte
374         var err error
375         ctx.LogD(
376                 "llp-start",
377                 SDS{"nice": strconv.Itoa(int(nice))},
378                 "waiting for first message",
379         )
380         conn.SetReadDeadline(time.Now().Add(DeadlineDuration))
381         if buf, err = state.ReadLLP(conn); err != nil {
382                 ctx.LogE("llp-start", SDS{"err": err}, "")
383                 return nil, err
384         }
385         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
386                 ctx.LogE("llp-start", SDS{"err": err}, "")
387                 return nil, err
388         }
389
390         var nodeId *NodeId
391         for _, node := range ctx.Neigh {
392                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), node.NoisePub[:]) == 1 {
393                         nodeId = node.Id
394                         break
395                 }
396         }
397         if nodeId == nil {
398                 peerId := ToBase32(state.hs.PeerStatic())
399                 ctx.LogE("llp-start", SDS{"peer": peerId}, "unknown")
400                 return nil, errors.New("Unknown peer: " + peerId)
401         }
402         state.NodeId = nodeId
403         sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
404
405         if ctx.ensureRxDir(nodeId); err != nil {
406                 return nil, err
407         }
408         var rxLock *os.File
409         if xxOnly != nil && *xxOnly == TRx {
410                 rxLock, err = ctx.LockDir(nodeId, TRx)
411                 if err != nil {
412                         return nil, err
413                 }
414         }
415         state.rxLock = rxLock
416         var txLock *os.File
417         if xxOnly != nil && *xxOnly == TTx {
418                 txLock, err = ctx.LockDir(nodeId, TTx)
419                 if err != nil {
420                         return nil, err
421                 }
422         }
423         state.txLock = txLock
424
425         var infosPayloads [][]byte
426         if xxOnly == nil || *xxOnly != TTx {
427                 infosPayloads = ctx.infosOur(nodeId, nice)
428         }
429         var firstPayload []byte
430         if len(infosPayloads) > 0 {
431                 firstPayload = infosPayloads[0]
432         }
433         for i := 0; i < (MaxLLPSize-len(firstPayload))/LLPHeadOverhead; i++ {
434                 firstPayload = append(firstPayload, LLPHaltMarshalized...)
435         }
436
437         ctx.LogD("llp-start", sds, "sending first message")
438         buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload)
439         conn.SetWriteDeadline(time.Now().Add(DeadlineDuration))
440         if err = state.WriteLLP(conn, buf); err != nil {
441                 ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
442                 state.dirUnlock()
443                 return nil, err
444         }
445         ctx.LogD("llp-start", sds, "starting workers")
446         err = state.StartWorkers(conn, infosPayloads, payload)
447         if err != nil {
448                 state.dirUnlock()
449                 return nil, err
450         }
451         return &state, err
452 }
453
454 func (state *LLPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payload []byte) error {
455         sds := SDS{"node": state.NodeId, "nice": strconv.Itoa(int(state.nice))}
456         if len(infosPayloads) > 1 {
457                 go func() {
458                         for _, payload := range infosPayloads[1:] {
459                                 state.ctx.LogD(
460                                         "llp-work",
461                                         SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
462                                         "queuing remaining payload",
463                                 )
464                                 state.payloads <- payload
465                         }
466                 }()
467         }
468         state.ctx.LogD(
469                 "llp-work",
470                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
471                 "processing first payload",
472         )
473         replies, err := state.ProcessLLP(payload)
474         if err != nil {
475                 state.ctx.LogE("llp-work", SdsAdd(sds, SDS{"err": err}), "")
476                 return err
477         }
478         go func() {
479                 for _, reply := range replies {
480                         state.ctx.LogD(
481                                 "llp-work",
482                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
483                                 "queuing reply",
484                         )
485                         state.payloads <- reply
486                 }
487         }()
488         state.wg.Add(1)
489         go func() {
490                 defer state.wg.Done()
491                 for {
492                         if state.isDead {
493                                 return
494                         }
495                         var payload []byte
496                         select {
497                         case payload = <-state.payloads:
498                                 state.ctx.LogD(
499                                         "llp-xmit",
500                                         SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
501                                         "got payload",
502                                 )
503                         default:
504                         }
505                         if payload == nil {
506                                 state.RLock()
507                                 if len(state.queueTheir) == 0 {
508                                         state.ctx.LogD("llp-xmit", sds, "file queue is empty")
509                                         state.RUnlock()
510                                         time.Sleep(100 * time.Millisecond)
511                                         continue
512                                 }
513                                 freq := state.queueTheir[0]
514                                 state.RUnlock()
515                                 sdsp := SdsAdd(sds, SDS{
516                                         "xx":   string(TTx),
517                                         "hash": ToBase32(freq.Hash[:]),
518                                         "size": strconv.FormatInt(int64(freq.Offset), 10),
519                                 })
520                                 state.ctx.LogD("llp-file", sdsp, "queueing")
521                                 fd, err := os.Open(filepath.Join(
522                                         state.ctx.Spool,
523                                         state.NodeId.String(),
524                                         string(TTx),
525                                         ToBase32(freq.Hash[:]),
526                                 ))
527                                 if err != nil {
528                                         state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
529                                         break
530                                 }
531                                 state.ctx.LogD("llp-file", sdsp, "seeking")
532                                 if _, err = fd.Seek(int64(freq.Offset), 0); err != nil {
533                                         state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
534                                         break
535                                 }
536                                 buf := make([]byte, MaxLLPSize-LLPHeadOverhead-LLPFileOverhead)
537                                 n, err := fd.Read(buf)
538                                 if err != nil {
539                                         state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
540                                         break
541                                 }
542                                 buf = buf[:n]
543                                 state.ctx.LogD(
544                                         "llp-file",
545                                         SdsAdd(sdsp, SDS{"size": strconv.Itoa(n)}),
546                                         "read",
547                                 )
548                                 fi, err := fd.Stat()
549                                 if err != nil {
550                                         state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
551                                         break
552                                 }
553                                 fullSize := uint64(fi.Size())
554                                 fd.Close()
555                                 payload = MarshalLLP(LLPTypeFile, LLPFile{
556                                         Hash:    freq.Hash,
557                                         Offset:  freq.Offset,
558                                         Payload: buf,
559                                 })
560                                 state.ctx.LogP("llp-file", SdsAdd(sdsp, SDS{
561                                         "fullsize": strconv.FormatInt(int64(fullSize), 10),
562                                 }), "")
563                                 state.Lock()
564                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].Hash == *freq.Hash {
565                                         if freq.Offset+uint64(len(buf)) == fullSize {
566                                                 state.ctx.LogD("llp-file", sdsp, "finished")
567                                                 if len(state.queueTheir) > 1 {
568                                                         state.queueTheir = state.queueTheir[1:]
569                                                 } else {
570                                                         state.queueTheir = state.queueTheir[:0]
571                                                 }
572                                         } else {
573                                                 state.queueTheir[0].Offset += uint64(len(buf))
574                                         }
575                                 } else {
576                                         state.ctx.LogD("llp-file", sdsp, "queue disappeared")
577                                 }
578                                 state.Unlock()
579                         }
580                         state.ctx.LogD(
581                                 "llp-xmit",
582                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
583                                 "sending",
584                         )
585                         conn.SetWriteDeadline(time.Now().Add(DeadlineDuration))
586                         if err := state.WriteLLP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil {
587                                 state.ctx.LogE("llp-xmit", SdsAdd(sds, SDS{"err": err}), "")
588                                 break
589                         }
590                 }
591                 state.isDead = true
592         }()
593         state.wg.Add(1)
594         go func() {
595                 defer state.wg.Done()
596                 for {
597                         if state.isDead {
598                                 return
599                         }
600                         state.ctx.LogD("llp-recv", sds, "waiting for payload")
601                         conn.SetReadDeadline(time.Now().Add(DeadlineDuration))
602                         payload, err := state.ReadLLP(conn)
603                         if err != nil {
604                                 unmarshalErr := err.(*xdr.UnmarshalError)
605                                 netErr, ok := unmarshalErr.Err.(net.Error)
606                                 if !(ok && netErr.Timeout()) {
607                                         state.ctx.LogE("llp-recv", SdsAdd(sds, SDS{"err": err}), "")
608                                 }
609                                 break
610                         }
611                         state.ctx.LogD(
612                                 "llp-recv",
613                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
614                                 "got payload",
615                         )
616                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
617                         if err != nil {
618                                 state.ctx.LogE("llp-recv", SdsAdd(sds, SDS{"err": err}), "")
619                                 break
620                         }
621                         state.ctx.LogD(
622                                 "llp-recv",
623                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
624                                 "processing",
625                         )
626                         replies, err := state.ProcessLLP(payload)
627                         if err != nil {
628                                 state.ctx.LogE("llp-recv", SdsAdd(sds, SDS{"err": err}), "")
629                                 break
630                         }
631                         go func() {
632                                 for _, reply := range replies {
633                                         state.ctx.LogD(
634                                                 "llp-recv",
635                                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
636                                                 "queuing reply",
637                                         )
638                                         state.payloads <- reply
639                                 }
640                         }()
641                 }
642                 state.isDead = true
643         }()
644         return nil
645 }
646
647 func (state *LLPState) Wait() {
648         state.wg.Wait()
649         state.dirUnlock()
650         state.Duration = time.Now().Sub(state.started)
651         state.RxSpeed = state.RxBytes
652         state.TxSpeed = state.TxBytes
653         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
654         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
655         if rxDuration > 0 {
656                 state.RxSpeed = state.RxBytes / rxDuration
657         }
658         if txDuration > 0 {
659                 state.TxSpeed = state.TxBytes / txDuration
660         }
661 }
662
663 func (state *LLPState) ProcessLLP(payload []byte) ([][]byte, error) {
664         sds := SDS{"node": state.NodeId, "nice": strconv.Itoa(int(state.nice))}
665         r := bytes.NewReader(payload)
666         var err error
667         var replies [][]byte
668         var infosGot bool
669         for r.Len() > 0 {
670                 state.ctx.LogD("llp-process", sds, "unmarshaling header")
671                 var head LLPHead
672                 if _, err = xdr.Unmarshal(r, &head); err != nil {
673                         state.ctx.LogE("llp-process", SdsAdd(sds, SDS{"err": err}), "")
674                         return nil, err
675                 }
676                 switch head.Type {
677                 case LLPTypeInfo:
678                         infosGot = true
679                         sdsp := SdsAdd(sds, SDS{"type": "info"})
680                         state.ctx.LogD("llp-process", sdsp, "unmarshaling packet")
681                         var info LLPInfo
682                         if _, err = xdr.Unmarshal(r, &info); err != nil {
683                                 state.ctx.LogE("llp-process", SdsAdd(sdsp, SDS{"err": err}), "")
684                                 return nil, err
685                         }
686                         sdsp = SdsAdd(sds, SDS{
687                                 "hash": ToBase32(info.Hash[:]),
688                                 "size": strconv.FormatInt(int64(info.Size), 10),
689                         })
690                         if info.Nice > state.nice {
691                                 state.ctx.LogD("llp-process", sdsp, "too nice")
692                                 continue
693                         }
694                         state.ctx.LogD("llp-process", sdsp, "received")
695                         if state.xxOnly != nil && *state.xxOnly == TTx {
696                                 continue
697                         }
698                         state.Lock()
699                         state.infosTheir[*info.Hash] = &info
700                         state.Unlock()
701                         state.ctx.LogD("llp-process", sdsp, "stating part")
702                         fi, err := os.Stat(filepath.Join(
703                                 state.ctx.Spool,
704                                 state.NodeId.String(),
705                                 string(TRx),
706                                 ToBase32(info.Hash[:])+PartSuffix,
707                         ))
708                         var offset int64
709                         if err == nil {
710                                 offset = fi.Size()
711                                 state.ctx.LogD(
712                                         "llp-process",
713                                         SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(offset, 10)}),
714                                         "part exists",
715                                 )
716                         }
717                         replies = append(replies, MarshalLLP(
718                                 LLPTypeFreq,
719                                 LLPFreq{info.Hash, uint64(offset)},
720                         ))
721                 case LLPTypeFile:
722                         state.ctx.LogD(
723                                 "llp-process",
724                                 SdsAdd(sds, SDS{"type": "file"}),
725                                 "unmarshaling packet",
726                         )
727                         var file LLPFile
728                         if _, err = xdr.Unmarshal(r, &file); err != nil {
729                                 state.ctx.LogE("llp-process", SdsAdd(sds, SDS{
730                                         "err":  err,
731                                         "type": "file",
732                                 }), "")
733                                 return nil, err
734                         }
735                         sdsp := SdsAdd(sds, SDS{
736                                 "xx":   string(TRx),
737                                 "hash": ToBase32(file.Hash[:]),
738                                 "size": strconv.Itoa(len(file.Payload)),
739                         })
740                         filePath := filepath.Join(
741                                 state.ctx.Spool,
742                                 state.NodeId.String(),
743                                 string(TRx),
744                                 ToBase32(file.Hash[:]),
745                         )
746                         state.ctx.LogD("llp-file", sdsp, "opening part")
747                         fd, err := os.OpenFile(
748                                 filePath+PartSuffix,
749                                 os.O_RDWR|os.O_CREATE,
750                                 os.FileMode(0600),
751                         )
752                         if err != nil {
753                                 state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
754                                 return nil, err
755                         }
756                         state.ctx.LogD(
757                                 "llp-file",
758                                 SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(int64(file.Offset), 10)}),
759                                 "seeking",
760                         )
761                         if _, err = fd.Seek(int64(file.Offset), 0); err != nil {
762                                 state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
763                                 fd.Close()
764                                 return nil, err
765                         }
766                         state.ctx.LogD("llp-file", sdsp, "writing")
767                         _, err = fd.Write(file.Payload)
768                         if err != nil {
769                                 state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
770                                 fd.Close()
771                                 return nil, err
772                         }
773                         ourSize := uint64(file.Offset) + uint64(len(file.Payload))
774                         sdsp["fullsize"] = strconv.FormatInt(int64(state.infosTheir[*file.Hash].Size), 10)
775                         sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
776                         state.ctx.LogP("llp-file", sdsp, "")
777                         if state.infosTheir[*file.Hash].Size != ourSize {
778                                 fd.Close()
779                                 continue
780                         }
781                         go func() {
782                                 if err := fd.Sync(); err != nil {
783                                         state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "sync")
784                                         fd.Close()
785                                         return
786                                 }
787                                 fd.Seek(0, 0)
788                                 state.ctx.LogD("llp-file", sdsp, "checking")
789                                 gut, err := Check(fd, file.Hash[:])
790                                 fd.Close()
791                                 if err != nil || !gut {
792                                         state.ctx.LogE("llp-file", sdsp, "checksum mismatch")
793                                         return
794                                 }
795                                 state.ctx.LogI("llp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
796                                 os.Rename(filePath+PartSuffix, filePath)
797                                 state.payloads <- MarshalLLP(LLPTypeDone, LLPDone{file.Hash})
798                         }()
799                 case LLPTypeDone:
800                         state.ctx.LogD(
801                                 "llp-process",
802                                 SdsAdd(sds, SDS{"type": "done"}),
803                                 "unmarshaling packet",
804                         )
805                         var done LLPDone
806                         if _, err = xdr.Unmarshal(r, &done); err != nil {
807                                 state.ctx.LogE("llp-process", SdsAdd(sds, SDS{
808                                         "type": "done",
809                                         "err":  err,
810                                 }), "")
811                                 return nil, err
812                         }
813                         sdsp := SdsAdd(sds, SDS{"hash": ToBase32(done.Hash[:])})
814                         state.ctx.LogD("llp-done", sdsp, "removing")
815                         err := os.Remove(filepath.Join(
816                                 state.ctx.Spool,
817                                 state.NodeId.String(),
818                                 string(TTx),
819                                 ToBase32(done.Hash[:]),
820                         ))
821                         if err == nil {
822                                 state.ctx.LogI("llp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "")
823                         } else {
824                                 state.ctx.LogE("llp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "")
825                         }
826                 case LLPTypeFreq:
827                         sdsp := SdsAdd(sds, SDS{"type": "freq"})
828                         state.ctx.LogD("llp-process", sdsp, "unmarshaling packet")
829                         var freq LLPFreq
830                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
831                                 state.ctx.LogE("llp-process", SdsAdd(sdsp, SDS{"err": err}), "")
832                                 return nil, err
833                         }
834                         state.ctx.LogD("llp-process", SdsAdd(sdsp, SDS{
835                                 "hash":   ToBase32(freq.Hash[:]),
836                                 "offset": strconv.FormatInt(int64(freq.Offset), 10),
837                         }), "queueing")
838                         state.Lock()
839                         state.queueTheir = append(state.queueTheir, &freq)
840                         state.Unlock()
841                 case LLPTypeHalt:
842                         sdsp := SdsAdd(sds, SDS{"type": "halt"})
843                         state.ctx.LogD("llp-process", sdsp, "")
844                         state.Lock()
845                         state.queueTheir = nil
846                         state.Unlock()
847                 default:
848                         state.ctx.LogE(
849                                 "llp-process",
850                                 SdsAdd(sds, SDS{"type": head.Type}),
851                                 "unknown",
852                         )
853                         return nil, BadPktType
854                 }
855         }
856         if infosGot {
857                 var pkts int
858                 var size uint64
859                 for _, info := range state.infosTheir {
860                         pkts++
861                         size += info.Size
862                 }
863                 state.ctx.LogI("llp-infos", SDS{
864                         "xx":   string(TRx),
865                         "node": state.NodeId,
866                         "pkts": strconv.Itoa(pkts),
867                         "size": strconv.FormatInt(int64(size), 10),
868                 }, "")
869         }
870         return payloadsSplit(replies), nil
871 }