1 use wasmtime::component::{Component, FutureAny, FutureReader, Linker, StreamAny, StreamReader};
2 use wasmtime::{Config, Engine, Result, Store};
3 
4 #[test]
simple_type_conversions() -> Result<()>5 fn simple_type_conversions() -> Result<()> {
6     let mut config = Config::new();
7     config.wasm_component_model_async(true);
8     let engine = Engine::new(&config)?;
9     let mut store = Store::new(&engine, ());
10 
11     let f = FutureReader::new(&mut store, async { wasmtime::error::Ok(10_u32) })?;
12     let f = f.try_into_future_any(&mut store).unwrap();
13     assert!(f.clone().try_into_future_reader::<()>().is_err());
14     assert!(f.clone().try_into_future_reader::<u64>().is_err());
15     let f = f.try_into_future_reader::<u32>().unwrap();
16     f.try_into_future_any(&mut store)
17         .unwrap()
18         .close(&mut store)?;
19 
20     let s = StreamReader::new(&mut store, vec![10_u32])?;
21     let s = s.try_into_stream_any(&mut store).unwrap();
22     assert!(s.clone().try_into_stream_reader::<()>().is_err());
23     assert!(s.clone().try_into_stream_reader::<u64>().is_err());
24     let s = s.try_into_stream_reader::<u32>().unwrap();
25     s.try_into_stream_any(&mut store)
26         .unwrap()
27         .close(&mut store)?;
28 
29     Ok(())
30 }
31 
32 #[test]
33 #[cfg_attr(miri, ignore)]
simple_type_assertions() -> Result<()>34 fn simple_type_assertions() -> Result<()> {
35     let mut config = Config::new();
36     config.wasm_component_model_async(true);
37     let engine = Engine::new(&config)?;
38     let mut store = Store::new(&engine, ());
39 
40     let component = Component::new(
41         &engine,
42         r#"
43         (component
44             (type $f (future u32))
45             (type $s (stream u32))
46             (core func $mk-f (canon future.new $f))
47             (core func $mk-s (canon stream.new $s))
48 
49             (core module $m
50                 (import "" "mk-f" (func $mk-f (result i64)))
51                 (import "" "mk-s" (func $mk-s (result i64)))
52 
53                 (func (export "x") (param i32) (result i32) local.get 0)
54 
55                 (func (export "mk-f") (result i32)
56                     (i32.wrap_i64 (call $mk-f)))
57                 (func (export "mk-s") (result i32)
58                     (i32.wrap_i64 (call $mk-s)))
59             )
60             (core instance $i (instantiate $m
61                 (with "" (instance
62                     (export "mk-f" (func $mk-f))
63                     (export "mk-s" (func $mk-s))
64                 ))
65             ))
66             (func (export "f") (param "f" $f) (result $f)
67                 (canon lift (core func $i "x")))
68             (func (export "s") (param "s" $s) (result $s)
69                 (canon lift (core func $i "x")))
70             (func (export "mk-f") (result $f)
71                 (canon lift (core func $i "mk-f")))
72             (func (export "mk-s") (result $s)
73                 (canon lift (core func $i "mk-s")))
74         )
75         "#,
76     )?;
77 
78     let instance = Linker::new(&engine).instantiate(&mut store, &component)?;
79 
80     let f_t_t =
81         instance.get_typed_func::<(FutureReader<u32>,), (FutureReader<u32>,)>(&mut store, "f")?;
82     let f_t_a = instance.get_typed_func::<(FutureReader<u32>,), (FutureAny,)>(&mut store, "f")?;
83     let f_a_t = instance.get_typed_func::<(FutureAny,), (FutureReader<u32>,)>(&mut store, "f")?;
84     let f_a_a = instance.get_typed_func::<(FutureAny,), (FutureAny,)>(&mut store, "f")?;
85 
86     let s_t_t =
87         instance.get_typed_func::<(StreamReader<u32>,), (StreamReader<u32>,)>(&mut store, "s")?;
88     let s_t_a = instance.get_typed_func::<(StreamReader<u32>,), (StreamAny,)>(&mut store, "s")?;
89     let s_a_t = instance.get_typed_func::<(StreamAny,), (StreamReader<u32>,)>(&mut store, "s")?;
90     let s_a_a = instance.get_typed_func::<(StreamAny,), (StreamAny,)>(&mut store, "s")?;
91 
92     let mk_f_t = instance.get_typed_func::<(), (FutureReader<u32>,)>(&mut store, "mk-f")?;
93     let mk_f_a = instance.get_typed_func::<(), (FutureAny,)>(&mut store, "mk-f")?;
94     let mk_s_t = instance.get_typed_func::<(), (StreamReader<u32>,)>(&mut store, "mk-s")?;
95     let mk_s_a = instance.get_typed_func::<(), (StreamAny,)>(&mut store, "mk-s")?;
96 
97     assert!(instance.get_typed_func::<(), ()>(&mut store, "f").is_err());
98     assert!(
99         instance
100             .get_typed_func::<(u32,), (FutureReader<u32>,)>(&mut store, "f")
101             .is_err()
102     );
103     assert!(
104         instance
105             .get_typed_func::<(FutureReader<u32>,), (u32,)>(&mut store, "f")
106             .is_err()
107     );
108     assert!(
109         instance
110             .get_typed_func::<(FutureReader<()>,), (FutureReader<u32>,)>(&mut store, "f")
111             .is_err()
112     );
113     assert!(
114         instance
115             .get_typed_func::<(FutureReader<u64>,), (FutureReader<u32>,)>(&mut store, "f")
116             .is_err()
117     );
118 
119     assert!(instance.get_typed_func::<(), ()>(&mut store, "s").is_err());
120     assert!(
121         instance
122             .get_typed_func::<(u32,), (StreamReader<u32>,)>(&mut store, "s")
123             .is_err()
124     );
125     assert!(
126         instance
127             .get_typed_func::<(StreamReader<u32>,), (u32,)>(&mut store, "s")
128             .is_err()
129     );
130     assert!(
131         instance
132             .get_typed_func::<(StreamReader<()>,), (StreamReader<u32>,)>(&mut store, "s")
133             .is_err()
134     );
135     assert!(
136         instance
137             .get_typed_func::<(StreamReader<u64>,), (StreamReader<u32>,)>(&mut store, "s")
138             .is_err()
139     );
140 
141     let roundtrip = |store: &mut Store<()>, f: FutureReader<u32>| -> Result<()> {
142         let (f,) = f_t_t.call(&mut *store, (f,))?;
143         let (f,) = f_t_a.call(&mut *store, (f,))?;
144         let (f,) = f_a_a.call(&mut *store, (f,))?;
145         let (mut f,) = f_a_t.call(&mut *store, (f,))?;
146         f.close(&mut *store)?;
147         Ok(())
148     };
149 
150     let f = FutureReader::new(&mut store, async { wasmtime::error::Ok(10_u32) })?;
151     roundtrip(&mut store, f)?;
152 
153     let (f,) = mk_f_t.call(&mut store, ())?;
154     roundtrip(&mut store, f)?;
155 
156     let (f,) = mk_f_a.call(&mut store, ())?;
157     let f = f.try_into_future_reader::<u32>()?;
158     roundtrip(&mut store, f)?;
159 
160     let roundtrip = |store: &mut Store<()>, s: StreamReader<u32>| -> Result<()> {
161         let (s,) = s_t_t.call(&mut *store, (s,))?;
162         let (s,) = s_t_a.call(&mut *store, (s,))?;
163         let (s,) = s_a_a.call(&mut *store, (s,))?;
164         let (mut s,) = s_a_t.call(&mut *store, (s,))?;
165         s.close(&mut *store)?;
166         Ok(())
167     };
168 
169     let s = StreamReader::new(&mut store, vec![10_u32])?;
170     roundtrip(&mut store, s)?;
171 
172     let (s,) = mk_s_t.call(&mut store, ())?;
173     roundtrip(&mut store, s)?;
174 
175     let (s,) = mk_s_a.call(&mut store, ())?;
176     let s = s.try_into_stream_reader::<u32>()?;
177     roundtrip(&mut store, s)?;
178 
179     Ok(())
180 }
181 
182 #[tokio::test]
183 #[cfg_attr(miri, ignore)]
stream_any_smoke() -> Result<()>184 async fn stream_any_smoke() -> Result<()> {
185     let mut config = Config::new();
186     config.wasm_component_model_async(true);
187     let engine = Engine::new(&config)?;
188     let mut store = Store::new(&engine, ());
189     let component = Component::new(
190         &engine,
191         r#"
192 (component
193     (type $s (stream u8))
194 
195     (core module $libc (memory (export "mem") 1))
196     (core instance $libc (instantiate $libc))
197 
198     (core module $m
199         (import "" "stream.new" (func $stream.new (result i64)))
200         (import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32)))
201         (import "" "task.return" (func $task.return))
202         (import "" "waitable-set.new" (func $waitable-set.new (result i32)))
203         (import "" "waitable.join" (func $waitable.join (param i32 i32)))
204         (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32)))
205         (import "" "waitable-set.drop" (func $waitable-set.drop (param i32)))
206         (import "" "mem" (memory 1))
207 
208         (global $w (mut i32) (i32.const 0))
209 
210         (func (export "mk") (result i32)
211             (local $r i32) (local $tmp i64)
212             (local.set $tmp (call $stream.new))
213             (local.set $r (i32.wrap_i64 (local.get $tmp)))
214             (global.set $w (i32.wrap_i64 (i64.shr_u (local.get $tmp) (i64.const 32))))
215             local.get $r
216         )
217 
218         (func (export "run") (result i32)
219             (local $ws i32)
220             (local.set $ws (call $waitable-set.new))
221             (call $waitable.join (global.get $w) (local.get $ws))
222             (call $waitable-set.wait (local.get $ws) (i32.const 0))
223             i32.const 3 ;; EVENT_STREAM_WRITE
224             i32.ne
225             if unreachable end
226 
227             (if (i32.ne (i32.load (i32.const 0)) (global.get $w))
228               (then unreachable))
229             (if (i32.ne (i32.load (i32.const 4)) (i32.const 1)) ;; DROPPED | (0 << 4)
230               (then unreachable))
231 
232             call $task.return
233 
234             i32.const 0 ;; CALLBACK_CODE_EXIT
235         )
236 
237         (func (export "cb") (param i32 i32 i32) (result i32) unreachable)
238     )
239     (core func $stream.new (canon stream.new $s))
240     (core func $stream.write (canon stream.write $s (memory $libc "mem")))
241     (core func $task.return (canon task.return))
242     (core func $waitable-set.new (canon waitable-set.new))
243     (core func $waitable.join (canon waitable.join))
244     (core func $waitable-set.wait (canon waitable-set.wait (memory $libc "mem")))
245     (core func $waitable-set.drop (canon waitable-set.drop))
246     (core instance $i (instantiate $m
247         (with "" (instance
248             (export "stream.new" (func $stream.new))
249             (export "stream.write" (func $stream.write))
250             (export "task.return" (func $task.return))
251             (export "waitable-set.new" (func $waitable-set.new))
252             (export "waitable.join" (func $waitable.join))
253             (export "waitable-set.wait" (func $waitable-set.wait))
254             (export "waitable-set.drop" (func $waitable-set.drop))
255             (export "mem" (memory $libc "mem"))
256         ))
257     ))
258     (func (export "mk") (result (stream u8))
259         (canon lift (core func $i "mk")))
260     (func (export "run") async
261         (canon lift (core func $i "run") async (callback (func $i "cb"))))
262 )
263         "#,
264     )?;
265     let instance = Linker::new(&engine).instantiate(&mut store, &component)?;
266     let mk = instance.get_typed_func::<(), (StreamAny,)>(&mut store, "mk")?;
267     let run = instance.get_typed_func::<(), ()>(&mut store, "run")?;
268     store
269         .run_concurrent(async |store| {
270             let (mut stream,) = mk.call_concurrent(store, ()).await?;
271             tokio::try_join! {
272                 async {
273                     run.call_concurrent(store, ()).await?;
274                     wasmtime::error::Ok(())
275                 },
276                 async {
277                     store.with(|store| stream.close(store))?;
278                     wasmtime::error::Ok(())
279                 }
280             }?;
281             wasmtime::error::Ok(())
282         })
283         .await??;
284     Ok(())
285 }
286