]> Cypherpunks.ru repositories - gostls13.git/commitdiff
runtime: implement wasip1 netpoll
authorChris O'Hara <cohara87@gmail.com>
Mon, 8 May 2023 07:07:18 +0000 (17:07 +1000)
committerGopher Robot <gobot@golang.org>
Thu, 25 May 2023 00:12:40 +0000 (00:12 +0000)
Implements netpoll using WASI's poll_oneoff system call.

This enables non-blocking I/O support for wasip1.

Change-Id: Ie395fa49d651c8b8262d485e2847dd65b0a10bc6
Reviewed-on: https://go-review.googlesource.com/c/go/+/493357
Reviewed-by: Dmitri Shuralyov <dmitshur@google.com>
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Matthew Dempsky <mdempsky@google.com>
Reviewed-by: Johan Brandhorst-Satzkorn <johan.brandhorst@gmail.com>
Reviewed-by: Julien Fabre <ju.pryz@gmail.com>
Auto-Submit: Johan Brandhorst-Satzkorn <johan.brandhorst@gmail.com>
Run-TryBot: Ian Lance Taylor <iant@golang.org>

12 files changed:
misc/wasm/go_wasip1_wasm_exec
src/cmd/dist/test.go
src/internal/poll/errno_unix.go
src/internal/poll/fd_poll_js.go [moved from src/internal/poll/fd_poll_wasm.go with 98% similarity]
src/internal/poll/fd_poll_runtime.go
src/runtime/internal/wasitest/host_test.go [new file with mode: 0644]
src/runtime/internal/wasitest/nonblock_test.go [new file with mode: 0644]
src/runtime/internal/wasitest/testdata/nonblock.go [new file with mode: 0644]
src/runtime/netpoll.go
src/runtime/netpoll_fake.go
src/runtime/netpoll_wasip1.go [new file with mode: 0644]
src/runtime/os_wasip1.go

index 72228d0501270d43d2e56a3305a83d47c1163e82..abcac8df36cb748cdec9300eef758d9878e960fa 100755 (executable)
@@ -5,10 +5,10 @@
 
 case "$GOWASIRUNTIME" in
        "wasmedge")
-               exec wasmedge --dir=/ --env PWD="$PWD" "$1" "${@:2}"
+               exec wasmedge --dir=/ --env PWD="$PWD" ${GOWASIRUNTIMEARGS:-} "$1" "${@:2}"
                ;;
        "wasmer")
-               exec wasmer run --dir=/ --env PWD="$PWD" "$1" -- "${@:2}"
+               exec wasmer run --dir=/ --env PWD="$PWD" ${GOWASIRUNTIMEARGS:-} "$1" -- "${@:2}"
                ;;
        "wasmtime")
                exec wasmtime run --dir=/ --env PWD="$PWD" --max-wasm-stack 1048576 "$1" -- "${@:2}"
index 3384149391575da9bcbc133e56300738910ec09a..f16bf32bbf426353c977e1d5c7551ca8f8d4c21a 100644 (file)
@@ -574,7 +574,8 @@ func (t *tester) registerTests() {
        // registerStdTestSpecially tracks import paths in the standard library
        // whose test registration happens in a special way.
        registerStdTestSpecially := map[string]bool{
-               "cmd/internal/testdir": true, // Registered at the bottom with sharding.
+               "runtime/internal/wasitest": true, // Registered at the bottom as a host test.
+               "cmd/internal/testdir":      true, // Registered at the bottom with sharding.
        }
 
        // Fast path to avoid the ~1 second of `go list std cmd` when
@@ -786,6 +787,16 @@ func (t *tester) registerTests() {
                t.registerCgoTests(cgoHeading)
        }
 
+       if goos == "wasip1" {
+               t.registerTest("wasip1 host tests",
+                       &goTest{
+                               variant:   "host",
+                               pkg:       "runtime/internal/wasitest",
+                               timeout:   1 * time.Minute,
+                               runOnHost: true,
+                       })
+       }
+
        if goos != "android" && !t.iOS() {
                // Only start multiple test dir shards on builders,
                // where they get distributed to multiple machines.
index 8eed93a31cb737c2dfea7b688611af912e12ee76..d1a18abda4917a0a1a6d04b13c9e1dea0e7f2e99 100644 (file)
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-//go:build unix
+//go:build unix || wasip1
 
 package poll
 
similarity index 98%
rename from src/internal/poll/fd_poll_wasm.go
rename to src/internal/poll/fd_poll_js.go
index b5158eba301213161413bb2cb921dc47d97c903e..fe5e73a149f6508a03aadbcd5e2e52562dfd635f 100644 (file)
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-//go:build (js && wasm) || wasip1
+//go:build js && wasm
 
 package poll
 
index 0a2e76d73f224197330ed50b5eabd08476243555..b51535ecf2f0a729e54e6abf6b10ea6797c84769 100644 (file)
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-//go:build unix || windows
+//go:build unix || windows || wasip1
 
 package poll
 
diff --git a/src/runtime/internal/wasitest/host_test.go b/src/runtime/internal/wasitest/host_test.go
new file mode 100644 (file)
index 0000000..ca4ef8f
--- /dev/null
@@ -0,0 +1,14 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package wasi_test
+
+import "flag"
+
+var target string
+
+func init() {
+       // The dist test runner passes -target when running this as a host test.
+       flag.StringVar(&target, "target", "", "")
+}
diff --git a/src/runtime/internal/wasitest/nonblock_test.go b/src/runtime/internal/wasitest/nonblock_test.go
new file mode 100644 (file)
index 0000000..887baab
--- /dev/null
@@ -0,0 +1,96 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package wasi_test
+
+import (
+       "bufio"
+       "fmt"
+       "io"
+       "math/rand"
+       "os"
+       "os/exec"
+       "path/filepath"
+       "syscall"
+       "testing"
+)
+
+// This test creates a set of FIFOs and writes to them in reverse order. It
+// checks that the output order matches the write order. The test binary opens
+// the FIFOs in their original order and spawns a goroutine for each that reads
+// from the FIFO and writes the result to stderr. If I/O was blocking, all
+// goroutines would be blocked waiting for one read call to return, and the
+// output order wouldn't match.
+
+type fifo struct {
+       file *os.File
+       path string
+}
+
+func TestNonblock(t *testing.T) {
+       if target != "wasip1/wasm" {
+               t.Skip()
+       }
+
+       switch os.Getenv("GOWASIRUNTIME") {
+       case "wazero", "":
+               t.Skip("wazero does not support non-blocking I/O")
+       case "wasmer":
+               t.Skip("wasmer does not support non-blocking I/O")
+       }
+
+       args := []string{"run", "./testdata/nonblock.go"}
+
+       fifos := make([]*fifo, 8)
+       for i := range fifos {
+               path := filepath.Join(t.TempDir(), fmt.Sprintf("wasip1-nonblock-fifo-%d-%d", rand.Uint32(), i))
+               if err := syscall.Mkfifo(path, 0666); err != nil {
+                       t.Fatal(err)
+               }
+
+               file, err := os.OpenFile(path, os.O_RDWR, 0)
+               if err != nil {
+                       t.Fatal(err)
+               }
+               defer file.Close()
+
+               args = append(args, path)
+               fifos[len(fifos)-i-1] = &fifo{file, path}
+       }
+
+       subProcess := exec.Command("go", args...)
+
+       subProcess.Env = append(os.Environ(), "GOOS=wasip1", "GOARCH=wasm")
+
+       pr, pw := io.Pipe()
+       defer pw.Close()
+
+       subProcess.Stderr = pw
+
+       if err := subProcess.Start(); err != nil {
+               t.Fatal(err)
+       }
+
+       scanner := bufio.NewScanner(pr)
+       if !scanner.Scan() {
+               t.Fatal("expected line:", scanner.Err())
+       } else if scanner.Text() != "waiting" {
+               t.Fatal("unexpected output:", scanner.Text())
+       }
+
+       for _, fifo := range fifos {
+               if _, err := fifo.file.WriteString(fifo.path + "\n"); err != nil {
+                       t.Fatal(err)
+               }
+               if !scanner.Scan() {
+                       t.Fatal("expected line:", scanner.Err())
+               } else if scanner.Text() != fifo.path {
+                       t.Fatal("unexpected line:", scanner.Text())
+               }
+       }
+
+       if err := subProcess.Wait(); err != nil {
+               t.Fatal(err)
+       }
+}
diff --git a/src/runtime/internal/wasitest/testdata/nonblock.go b/src/runtime/internal/wasitest/testdata/nonblock.go
new file mode 100644 (file)
index 0000000..947abe7
--- /dev/null
@@ -0,0 +1,48 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+       "os"
+       "sync"
+)
+
+func main() {
+       ready := make(chan struct{})
+
+       var wg sync.WaitGroup
+       for _, path := range os.Args[1:] {
+               f, err := os.Open(path)
+               if err != nil {
+                       panic(err)
+               }
+
+               spawnWait := make(chan struct{})
+
+               wg.Add(1)
+               go func(f *os.File) {
+                       defer f.Close()
+                       defer wg.Done()
+
+                       close(spawnWait)
+
+                       <-ready
+
+                       var buf [256]byte
+                       n, err := f.Read(buf[:])
+                       if err != nil {
+                               panic(err)
+                       }
+                       os.Stderr.Write(buf[:n])
+               }(f)
+
+               // Spawn one goroutine at a time.
+               <-spawnWait
+       }
+
+       println("waiting")
+       close(ready)
+       wg.Wait()
+}
index 3e6a6961e37097816ed11c39ad78f20e36c17c3f..9b54e8e21f2dc30b7a73ee8c02eab45f12e99269 100644 (file)
@@ -336,8 +336,8 @@ func poll_runtime_pollWait(pd *pollDesc, mode int) int {
        if errcode != pollNoError {
                return errcode
        }
-       // As for now only Solaris, illumos, and AIX use level-triggered IO.
-       if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
+       // As for now only Solaris, illumos, AIX and wasip1 use level-triggered IO.
+       if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" || GOOS == "wasip1" {
                netpollarm(pd, mode)
        }
        for !netpollblock(pd, int32(mode), false) {
index 5782c7851510b077d61bc386f9351f4dd01135e3..5319561779158d0d96bc3aa36799c2119af14417 100644 (file)
@@ -2,10 +2,10 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-// Fake network poller for js/wasm and wasip1/wasm.
-// Should never be used, because wasm network connections do not honor "SetNonblock".
+// Fake network poller for js/wasm.
+// Should never be used, because js/wasm network connections do not honor "SetNonblock".
 
-//go:build (js && wasm) || wasip1
+//go:build js && wasm
 
 package runtime
 
diff --git a/src/runtime/netpoll_wasip1.go b/src/runtime/netpoll_wasip1.go
new file mode 100644 (file)
index 0000000..677287b
--- /dev/null
@@ -0,0 +1,254 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build wasip1
+
+package runtime
+
+import "unsafe"
+
+// WASI network poller.
+//
+// WASI preview 1 includes a poll_oneoff host function that behaves similarly
+// to poll(2) on Linux. Like poll(2), poll_oneoff is level triggered. It
+// accepts one or more subscriptions to FD read or write events.
+//
+// Major differences to poll(2):
+// - the events are not written to the input entries (like pollfd.revents), and
+//   instead are appended to a separate events buffer. poll_oneoff writes zero
+//   or more events to the buffer (at most one per input subscription) and
+//   returns the number of events written. Although the index of the
+//   subscriptions might not match the index of the associated event in the
+//   events buffer, both the subscription and event structs contain a userdata
+//   field and when a subscription yields an event the userdata fields will
+//   match.
+// - there's no explicit timeout parameter, although a time limit can be added
+//   by using "clock" subscriptions.
+// - each FD subscription can either be for a read or a write, but not both.
+//   This is in contrast to poll(2) which accepts a mask with POLLIN and
+//   POLLOUT bits, allowing for a subscription to either, neither, or both
+//   reads and writes.
+//
+// Since poll_oneoff is similar to poll(2), the implementation here was derived
+// from netpoll_aix.go.
+
+const _EINTR = 27
+
+var (
+       evts []event
+       subs []subscription
+       pds  []*pollDesc
+       mtx  mutex
+)
+
+func netpollinit() {
+       // Unlike poll(2), WASI's poll_oneoff doesn't accept a timeout directly. To
+       // prevent it from blocking indefinitely, a clock subscription with a
+       // timeout field needs to be submitted. Reserve a slot here for the clock
+       // subscription, and set fields that won't change between poll_oneoff calls.
+
+       subs = make([]subscription, 1, 128)
+       evts = make([]event, 0, 128)
+       pds = make([]*pollDesc, 0, 128)
+
+       timeout := &subs[0]
+       eventtype := timeout.u.eventtype()
+       *eventtype = eventtypeClock
+       clock := timeout.u.subscriptionClock()
+       clock.id = clockMonotonic
+       clock.precision = 1e3
+}
+
+func netpollIsPollDescriptor(fd uintptr) bool {
+       return false
+}
+
+func netpollopen(fd uintptr, pd *pollDesc) int32 {
+       lock(&mtx)
+
+       // We don't worry about pd.fdseq here,
+       // as mtx protects us from stale pollDescs.
+
+       pds = append(pds, pd)
+
+       // The 32-bit pd.user field holds the index of the read subscription in the
+       // upper 16 bits, and index of the write subscription in the lower bits.
+       // A disarmed=^uint16(0) sentinel is used to represent no subscription.
+       // There is thus a maximum of 65535 total subscriptions.
+       pd.user = uint32(disarmed)<<16 | uint32(disarmed)
+
+       unlock(&mtx)
+       return 0
+}
+
+const disarmed = 0xFFFF
+
+func netpollarm(pd *pollDesc, mode int) {
+       lock(&mtx)
+
+       var s subscription
+
+       s.userdata = userdata(uintptr(unsafe.Pointer(pd)))
+
+       fdReadwrite := s.u.subscriptionFdReadwrite()
+       fdReadwrite.fd = int32(pd.fd)
+
+       ridx := int(pd.user >> 16)
+       widx := int(pd.user & 0xFFFF)
+
+       if (mode == 'r' && ridx != disarmed) || (mode == 'w' && widx != disarmed) {
+               unlock(&mtx)
+               return
+       }
+
+       eventtype := s.u.eventtype()
+       switch mode {
+       case 'r':
+               *eventtype = eventtypeFdRead
+               ridx = len(subs)
+       case 'w':
+               *eventtype = eventtypeFdWrite
+               widx = len(subs)
+       }
+
+       if len(subs) == disarmed {
+               throw("overflow")
+       }
+
+       pd.user = uint32(ridx)<<16 | uint32(widx)
+
+       subs = append(subs, s)
+       evts = append(evts, event{})
+
+       unlock(&mtx)
+}
+
+func netpolldisarm(pd *pollDesc, mode int32) {
+       switch mode {
+       case 'r':
+               removesub(int(pd.user >> 16))
+       case 'w':
+               removesub(int(pd.user & 0xFFFF))
+       case 'r' + 'w':
+               removesub(int(pd.user >> 16))
+               removesub(int(pd.user & 0xFFFF))
+       }
+}
+
+func removesub(i int) {
+       if i == disarmed {
+               return
+       }
+       j := len(subs) - 1
+
+       pdi := (*pollDesc)(unsafe.Pointer(uintptr(subs[i].userdata)))
+       pdj := (*pollDesc)(unsafe.Pointer(uintptr(subs[j].userdata)))
+
+       swapsub(pdi, i, disarmed)
+       swapsub(pdj, j, i)
+
+       subs = subs[:j]
+}
+
+func swapsub(pd *pollDesc, from, to int) {
+       if from == to {
+               return
+       }
+       ridx := int(pd.user >> 16)
+       widx := int(pd.user & 0xFFFF)
+       if ridx == from {
+               ridx = to
+       } else if widx == from {
+               widx = to
+       }
+       pd.user = uint32(ridx)<<16 | uint32(widx)
+       if to != disarmed {
+               subs[to], subs[from] = subs[from], subs[to]
+       }
+}
+
+func netpollclose(fd uintptr) int32 {
+       lock(&mtx)
+       for i := 0; i < len(pds); i++ {
+               if pds[i].fd == fd {
+                       netpolldisarm(pds[i], 'r'+'w')
+                       pds[i] = pds[len(pds)-1]
+                       pds = pds[:len(pds)-1]
+                       break
+               }
+       }
+       unlock(&mtx)
+       return 0
+}
+
+func netpollBreak() {}
+
+func netpoll(delay int64) gList {
+       lock(&mtx)
+
+       // If delay >= 0, we include a subscription of type Clock that we use as
+       // a timeout. If delay < 0, we omit the subscription and allow poll_oneoff
+       // to block indefinitely.
+       pollsubs := subs
+       if delay >= 0 {
+               timeout := &subs[0]
+               clock := timeout.u.subscriptionClock()
+               clock.timeout = uint64(delay)
+       } else {
+               pollsubs = subs[1:]
+       }
+
+       if len(pollsubs) == 0 {
+               unlock(&mtx)
+               return gList{}
+       }
+
+       evts = evts[:len(pollsubs)]
+       for i := range evts {
+               evts[i] = event{}
+       }
+
+retry:
+       var nevents size
+       errno := poll_oneoff(unsafe.Pointer(&pollsubs[0]), unsafe.Pointer(&evts[0]), uint32(len(pollsubs)), unsafe.Pointer(&nevents))
+       if errno != 0 {
+               if errno != _EINTR {
+                       println("errno=", errno, " len(pollsubs)=", len(pollsubs))
+                       throw("poll_oneoff failed")
+               }
+               // If a timed sleep was interrupted, just return to
+               // recalculate how long we should sleep now.
+               if delay > 0 {
+                       unlock(&mtx)
+                       return gList{}
+               }
+               goto retry
+       }
+
+       var toRun gList
+       for i := 0; i < int(nevents); i++ {
+               e := &evts[i]
+               if e.typ == eventtypeClock {
+                       continue
+               }
+
+               hangup := e.fdReadwrite.flags&fdReadwriteHangup != 0
+               var mode int32
+               if e.typ == eventtypeFdRead || e.error != 0 || hangup {
+                       mode += 'r'
+               }
+               if e.typ == eventtypeFdWrite || e.error != 0 || hangup {
+                       mode += 'w'
+               }
+               if mode != 0 {
+                       pd := (*pollDesc)(unsafe.Pointer(uintptr(e.userdata)))
+                       netpolldisarm(pd, mode)
+                       pd.setEventErr(e.error != 0, 0)
+                       netpollready(&toRun, pd, mode)
+               }
+       }
+
+       unlock(&mtx)
+       return toRun
+}
index 577d9652dded007cd82faa1502429a3d6ae3cef1..8811bb617854b878292b6d741f3ffa69dcac62b5 100644 (file)
@@ -123,6 +123,10 @@ type subscriptionClock struct {
        flags     subclockflags
 }
 
+type subscriptionFdReadwrite struct {
+       fd int32
+}
+
 type subscription struct {
        userdata userdata
        u        subscriptionUnion
@@ -138,6 +142,10 @@ func (u *subscriptionUnion) subscriptionClock() *subscriptionClock {
        return (*subscriptionClock)(unsafe.Pointer(&u[1]))
 }
 
+func (u *subscriptionUnion) subscriptionFdReadwrite() *subscriptionFdReadwrite {
+       return (*subscriptionFdReadwrite)(unsafe.Pointer(&u[1]))
+}
+
 //go:wasmimport wasi_snapshot_preview1 poll_oneoff
 //go:noescape
 func poll_oneoff(in, out unsafe.Pointer, nsubscriptions size, nevents unsafe.Pointer) errno