2 NNCP -- Node-to-Node CoPy
3 Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
34 "github.com/davecgh/go-xdr/xdr2"
35 "github.com/flynn/noise"
39 MaxLLPSize = 2<<15 - 256
44 MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'L', 1, 0, 0}
50 LLPHaltMarshalized []byte
52 NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
54 noise.CipherChaChaPoly,
58 DeadlineDuration time.Duration = 10 * time.Second
64 LLPTypeInfo LLPType = iota
65 LLPTypeFreq LLPType = iota
66 LLPTypeFile LLPType = iota
67 LLPTypeDone LLPType = iota
68 LLPTypeHalt LLPType = iota
103 llpHead := LLPHead{Type: LLPTypeHalt}
104 if _, err := xdr.Marshal(&buf, llpHead); err != nil {
107 copy(LLPHaltMarshalized, buf.Bytes())
108 LLPHeadOverhead = buf.Len()
111 llpInfo := LLPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
112 if _, err := xdr.Marshal(&buf, llpInfo); err != nil {
115 LLPInfoOverhead = buf.Len()
118 llpFreq := LLPFreq{Hash: new([32]byte), Offset: 123}
119 if _, err := xdr.Marshal(&buf, llpFreq); err != nil {
122 LLPFreqOverhead = buf.Len()
125 llpFile := LLPFile{Hash: new([32]byte), Offset: 123}
126 if _, err := xdr.Marshal(&buf, llpFile); err != nil {
129 LLPFileOverhead = buf.Len()
132 func MarshalLLP(typ LLPType, llp interface{}) []byte {
135 if _, err = xdr.Marshal(&buf, LLPHead{typ}); err != nil {
138 if _, err = xdr.Marshal(&buf, llp); err != nil {
144 func payloadsSplit(payloads [][]byte) [][]byte {
145 var outbounds [][]byte
146 outbound := make([]byte, 0, MaxLLPSize)
147 for i, payload := range payloads {
148 outbound = append(outbound, payload...)
149 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxLLPSize {
150 outbounds = append(outbounds, outbound)
151 outbound = make([]byte, 0, MaxLLPSize)
154 if len(outbound) > 0 {
155 outbounds = append(outbounds, outbound)
160 type LLPState struct {
164 hs *noise.HandshakeState
165 csOur *noise.CipherState
166 csTheir *noise.CipherState
168 infosTheir map[[32]byte]*LLPInfo
169 queueTheir []*LLPFreq
177 Duration time.Duration
186 func (state *LLPState) dirUnlock() {
187 state.ctx.UnlockDir(state.rxLock)
188 state.ctx.UnlockDir(state.txLock)
191 func (state *LLPState) WriteLLP(dst io.Writer, payload []byte) error {
192 n, err := xdr.Marshal(dst, LLPRaw{Magic: MagicNNCPLv1, Payload: payload})
194 state.TxLastSeen = time.Now()
195 state.TxBytes += int64(n)
200 func (state *LLPState) ReadLLP(src io.Reader) ([]byte, error) {
202 n, err := xdr.Unmarshal(src, &llp)
206 state.RxLastSeen = time.Now()
207 state.RxBytes += int64(n)
208 if llp.Magic != MagicNNCPLv1 {
211 return llp.Payload, nil
214 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8) [][]byte {
217 for job := range ctx.Jobs(nodeId, TTx) {
219 if job.PktEnc.Nice > nice {
222 totalSize += job.Size
223 infos = append(infos, &LLPInfo{
224 Nice: job.PktEnc.Nice,
225 Size: uint64(job.Size),
229 sort.Sort(ByNice(infos))
230 var payloads [][]byte
231 for _, info := range infos {
232 payloads = append(payloads, MarshalLLP(LLPTypeInfo, info))
233 ctx.LogD("llp-info-our", SDS{
235 "name": ToBase32(info.Hash[:]),
236 "size": strconv.FormatInt(int64(info.Size), 10),
239 ctx.LogI("llp-infos", SDS{
242 "pkts": strconv.Itoa(len(payloads)),
243 "size": strconv.FormatInt(totalSize, 10),
245 return payloadsSplit(payloads)
248 func (ctx *Ctx) ensureRxDir(nodeId *NodeId) error {
249 dirPath := filepath.Join(ctx.Spool, nodeId.String(), string(TRx))
250 if err := os.MkdirAll(dirPath, os.FileMode(0700)); err != nil {
251 ctx.LogE("llp-ensure", SDS{"dir": dirPath, "err": err}, "")
254 fd, err := os.Open(dirPath)
256 ctx.LogE("llp-ensure", SDS{"dir": dirPath, "err": err}, "")
263 func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) (*LLPState, error) {
264 err := ctx.ensureRxDir(nodeId)
269 if xxOnly != nil && *xxOnly == TRx {
270 rxLock, err = ctx.LockDir(nodeId, TRx)
276 if xxOnly != nil && *xxOnly == TTx {
277 txLock, err = ctx.LockDir(nodeId, TTx)
282 started := time.Now()
283 conf := noise.Config{
284 CipherSuite: NoiseCipherSuite,
285 Pattern: noise.HandshakeIK,
287 StaticKeypair: noise.DHKey{
288 Private: ctx.Self.NoisePrv[:],
289 Public: ctx.Self.NoisePub[:],
291 PeerStatic: ctx.Neigh[*nodeId].NoisePub[:],
295 hs: noise.NewHandshakeState(conf),
298 payloads: make(chan []byte),
299 infosTheir: make(map[[32]byte]*LLPInfo),
306 var infosPayloads [][]byte
307 if xxOnly == nil || *xxOnly != TTx {
308 infosPayloads = ctx.infosOur(nodeId, nice)
310 var firstPayload []byte
311 if len(infosPayloads) > 0 {
312 firstPayload = infosPayloads[0]
314 for i := 0; i < (MaxLLPSize-len(firstPayload))/LLPHeadOverhead; i++ {
315 firstPayload = append(firstPayload, LLPHaltMarshalized...)
320 buf, _, _ = state.hs.WriteMessage(nil, firstPayload)
321 sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
322 ctx.LogD("llp-start", sds, "sending first message")
323 conn.SetWriteDeadline(time.Now().Add(DeadlineDuration))
324 if err = state.WriteLLP(conn, buf); err != nil {
325 ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
329 ctx.LogD("llp-start", sds, "waiting for first message")
330 conn.SetReadDeadline(time.Now().Add(DeadlineDuration))
331 if buf, err = state.ReadLLP(conn); err != nil {
332 ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
336 payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
338 ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
342 ctx.LogD("llp-start", sds, "starting workers")
343 err = state.StartWorkers(conn, infosPayloads, payload)
345 ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
352 func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*LLPState, error) {
353 started := time.Now()
354 conf := noise.Config{
355 CipherSuite: NoiseCipherSuite,
356 Pattern: noise.HandshakeIK,
358 StaticKeypair: noise.DHKey{
359 Private: ctx.Self.NoisePrv[:],
360 Public: ctx.Self.NoisePub[:],
365 hs: noise.NewHandshakeState(conf),
367 payloads: make(chan []byte),
368 infosTheir: make(map[[32]byte]*LLPInfo),
377 SDS{"nice": strconv.Itoa(int(nice))},
378 "waiting for first message",
380 conn.SetReadDeadline(time.Now().Add(DeadlineDuration))
381 if buf, err = state.ReadLLP(conn); err != nil {
382 ctx.LogE("llp-start", SDS{"err": err}, "")
385 if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
386 ctx.LogE("llp-start", SDS{"err": err}, "")
391 for _, node := range ctx.Neigh {
392 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), node.NoisePub[:]) == 1 {
398 peerId := ToBase32(state.hs.PeerStatic())
399 ctx.LogE("llp-start", SDS{"peer": peerId}, "unknown")
400 return nil, errors.New("Unknown peer: " + peerId)
402 state.NodeId = nodeId
403 sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
405 if ctx.ensureRxDir(nodeId); err != nil {
409 if xxOnly != nil && *xxOnly == TRx {
410 rxLock, err = ctx.LockDir(nodeId, TRx)
415 state.rxLock = rxLock
417 if xxOnly != nil && *xxOnly == TTx {
418 txLock, err = ctx.LockDir(nodeId, TTx)
423 state.txLock = txLock
425 var infosPayloads [][]byte
426 if xxOnly == nil || *xxOnly != TTx {
427 infosPayloads = ctx.infosOur(nodeId, nice)
429 var firstPayload []byte
430 if len(infosPayloads) > 0 {
431 firstPayload = infosPayloads[0]
433 for i := 0; i < (MaxLLPSize-len(firstPayload))/LLPHeadOverhead; i++ {
434 firstPayload = append(firstPayload, LLPHaltMarshalized...)
437 ctx.LogD("llp-start", sds, "sending first message")
438 buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload)
439 conn.SetWriteDeadline(time.Now().Add(DeadlineDuration))
440 if err = state.WriteLLP(conn, buf); err != nil {
441 ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
445 ctx.LogD("llp-start", sds, "starting workers")
446 err = state.StartWorkers(conn, infosPayloads, payload)
454 func (state *LLPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payload []byte) error {
455 sds := SDS{"node": state.NodeId, "nice": strconv.Itoa(int(state.nice))}
456 if len(infosPayloads) > 1 {
458 for _, payload := range infosPayloads[1:] {
461 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
462 "queuing remaining payload",
464 state.payloads <- payload
470 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
471 "processing first payload",
473 replies, err := state.ProcessLLP(payload)
475 state.ctx.LogE("llp-work", SdsAdd(sds, SDS{"err": err}), "")
479 for _, reply := range replies {
482 SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
485 state.payloads <- reply
490 defer state.wg.Done()
497 case payload = <-state.payloads:
500 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
507 if len(state.queueTheir) == 0 {
508 state.ctx.LogD("llp-xmit", sds, "file queue is empty")
510 time.Sleep(100 * time.Millisecond)
513 freq := state.queueTheir[0]
515 sdsp := SdsAdd(sds, SDS{
517 "hash": ToBase32(freq.Hash[:]),
518 "size": strconv.FormatInt(int64(freq.Offset), 10),
520 state.ctx.LogD("llp-file", sdsp, "queueing")
521 fd, err := os.Open(filepath.Join(
523 state.NodeId.String(),
525 ToBase32(freq.Hash[:]),
528 state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
531 state.ctx.LogD("llp-file", sdsp, "seeking")
532 if _, err = fd.Seek(int64(freq.Offset), 0); err != nil {
533 state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
536 buf := make([]byte, MaxLLPSize-LLPHeadOverhead-LLPFileOverhead)
537 n, err := fd.Read(buf)
539 state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
545 SdsAdd(sdsp, SDS{"size": strconv.Itoa(n)}),
550 state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
553 fullSize := uint64(fi.Size())
555 payload = MarshalLLP(LLPTypeFile, LLPFile{
560 state.ctx.LogP("llp-file", SdsAdd(sdsp, SDS{
561 "fullsize": strconv.FormatInt(int64(fullSize), 10),
564 if len(state.queueTheir) > 0 && *state.queueTheir[0].Hash == *freq.Hash {
565 if freq.Offset+uint64(len(buf)) == fullSize {
566 state.ctx.LogD("llp-file", sdsp, "finished")
567 if len(state.queueTheir) > 1 {
568 state.queueTheir = state.queueTheir[1:]
570 state.queueTheir = state.queueTheir[:0]
573 state.queueTheir[0].Offset += uint64(len(buf))
576 state.ctx.LogD("llp-file", sdsp, "queue disappeared")
582 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
585 conn.SetWriteDeadline(time.Now().Add(DeadlineDuration))
586 if err := state.WriteLLP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil {
587 state.ctx.LogE("llp-xmit", SdsAdd(sds, SDS{"err": err}), "")
595 defer state.wg.Done()
600 state.ctx.LogD("llp-recv", sds, "waiting for payload")
601 conn.SetReadDeadline(time.Now().Add(DeadlineDuration))
602 payload, err := state.ReadLLP(conn)
604 unmarshalErr := err.(*xdr.UnmarshalError)
605 netErr, ok := unmarshalErr.Err.(net.Error)
606 if !(ok && netErr.Timeout()) {
607 state.ctx.LogE("llp-recv", SdsAdd(sds, SDS{"err": err}), "")
613 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
616 payload, err = state.csTheir.Decrypt(nil, nil, payload)
618 state.ctx.LogE("llp-recv", SdsAdd(sds, SDS{"err": err}), "")
623 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
626 replies, err := state.ProcessLLP(payload)
628 state.ctx.LogE("llp-recv", SdsAdd(sds, SDS{"err": err}), "")
632 for _, reply := range replies {
635 SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
638 state.payloads <- reply
647 func (state *LLPState) Wait() {
650 state.Duration = time.Now().Sub(state.started)
651 state.RxSpeed = state.RxBytes
652 state.TxSpeed = state.TxBytes
653 rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
654 txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
656 state.RxSpeed = state.RxBytes / rxDuration
659 state.TxSpeed = state.TxBytes / txDuration
663 func (state *LLPState) ProcessLLP(payload []byte) ([][]byte, error) {
664 sds := SDS{"node": state.NodeId, "nice": strconv.Itoa(int(state.nice))}
665 r := bytes.NewReader(payload)
670 state.ctx.LogD("llp-process", sds, "unmarshaling header")
672 if _, err = xdr.Unmarshal(r, &head); err != nil {
673 state.ctx.LogE("llp-process", SdsAdd(sds, SDS{"err": err}), "")
679 sdsp := SdsAdd(sds, SDS{"type": "info"})
680 state.ctx.LogD("llp-process", sdsp, "unmarshaling packet")
682 if _, err = xdr.Unmarshal(r, &info); err != nil {
683 state.ctx.LogE("llp-process", SdsAdd(sdsp, SDS{"err": err}), "")
686 sdsp = SdsAdd(sds, SDS{
687 "hash": ToBase32(info.Hash[:]),
688 "size": strconv.FormatInt(int64(info.Size), 10),
690 if info.Nice > state.nice {
691 state.ctx.LogD("llp-process", sdsp, "too nice")
694 state.ctx.LogD("llp-process", sdsp, "received")
695 if state.xxOnly != nil && *state.xxOnly == TTx {
699 state.infosTheir[*info.Hash] = &info
701 state.ctx.LogD("llp-process", sdsp, "stating part")
702 fi, err := os.Stat(filepath.Join(
704 state.NodeId.String(),
706 ToBase32(info.Hash[:])+PartSuffix,
713 SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(offset, 10)}),
717 replies = append(replies, MarshalLLP(
719 LLPFreq{info.Hash, uint64(offset)},
724 SdsAdd(sds, SDS{"type": "file"}),
725 "unmarshaling packet",
728 if _, err = xdr.Unmarshal(r, &file); err != nil {
729 state.ctx.LogE("llp-process", SdsAdd(sds, SDS{
735 sdsp := SdsAdd(sds, SDS{
737 "hash": ToBase32(file.Hash[:]),
738 "size": strconv.Itoa(len(file.Payload)),
740 filePath := filepath.Join(
742 state.NodeId.String(),
744 ToBase32(file.Hash[:]),
746 state.ctx.LogD("llp-file", sdsp, "opening part")
747 fd, err := os.OpenFile(
749 os.O_RDWR|os.O_CREATE,
753 state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
758 SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(int64(file.Offset), 10)}),
761 if _, err = fd.Seek(int64(file.Offset), 0); err != nil {
762 state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
766 state.ctx.LogD("llp-file", sdsp, "writing")
767 _, err = fd.Write(file.Payload)
769 state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
773 ourSize := uint64(file.Offset) + uint64(len(file.Payload))
774 sdsp["fullsize"] = strconv.FormatInt(int64(state.infosTheir[*file.Hash].Size), 10)
775 sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
776 state.ctx.LogP("llp-file", sdsp, "")
777 if state.infosTheir[*file.Hash].Size != ourSize {
782 if err := fd.Sync(); err != nil {
783 state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "sync")
788 state.ctx.LogD("llp-file", sdsp, "checking")
789 gut, err := Check(fd, file.Hash[:])
791 if err != nil || !gut {
792 state.ctx.LogE("llp-file", sdsp, "checksum mismatch")
795 state.ctx.LogI("llp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
796 os.Rename(filePath+PartSuffix, filePath)
797 state.payloads <- MarshalLLP(LLPTypeDone, LLPDone{file.Hash})
802 SdsAdd(sds, SDS{"type": "done"}),
803 "unmarshaling packet",
806 if _, err = xdr.Unmarshal(r, &done); err != nil {
807 state.ctx.LogE("llp-process", SdsAdd(sds, SDS{
813 sdsp := SdsAdd(sds, SDS{"hash": ToBase32(done.Hash[:])})
814 state.ctx.LogD("llp-done", sdsp, "removing")
815 err := os.Remove(filepath.Join(
817 state.NodeId.String(),
819 ToBase32(done.Hash[:]),
822 state.ctx.LogI("llp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "")
824 state.ctx.LogE("llp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "")
827 sdsp := SdsAdd(sds, SDS{"type": "freq"})
828 state.ctx.LogD("llp-process", sdsp, "unmarshaling packet")
830 if _, err = xdr.Unmarshal(r, &freq); err != nil {
831 state.ctx.LogE("llp-process", SdsAdd(sdsp, SDS{"err": err}), "")
834 state.ctx.LogD("llp-process", SdsAdd(sdsp, SDS{
835 "hash": ToBase32(freq.Hash[:]),
836 "offset": strconv.FormatInt(int64(freq.Offset), 10),
839 state.queueTheir = append(state.queueTheir, &freq)
842 sdsp := SdsAdd(sds, SDS{"type": "halt"})
843 state.ctx.LogD("llp-process", sdsp, "")
845 state.queueTheir = nil
850 SdsAdd(sds, SDS{"type": head.Type}),
853 return nil, BadPktType
859 for _, info := range state.infosTheir {
863 state.ctx.LogI("llp-infos", SDS{
865 "node": state.NodeId,
866 "pkts": strconv.Itoa(pkts),
867 "size": strconv.FormatInt(int64(size), 10),
870 return payloadsSplit(replies), nil