1 //! Implementation of [`FutureAny`] and [`StreamAny`].
2 
3 use crate::component::concurrent::futures_and_streams::{self, TransmitOrigin};
4 use crate::component::concurrent::{TableId, TransmitHandle};
5 use crate::component::func::{LiftContext, LowerContext, bad_type_info, desc};
6 use crate::component::matching::InstanceType;
7 use crate::component::types::{self, FutureType, StreamType};
8 use crate::component::{
9     ComponentInstanceId, ComponentType, FutureReader, Lift, Lower, StreamReader,
10 };
11 use crate::store::StoreOpaque;
12 use crate::{AsContextMut, Result, bail, error::Context};
13 use std::any::TypeId;
14 use std::mem::MaybeUninit;
15 use wasmtime_environ::component::{
16     CanonicalAbiInfo, InterfaceType, TypeFutureTableIndex, TypeStreamTableIndex,
17 };
18 
19 /// Represents a type-erased component model `future`.
20 ///
21 /// This type is similar to [`ResourceAny`](crate::component::ResourceAny)
22 /// where it's a static guarantee that it represents a component model
23 /// `future`, but it does not contain any information about the underlying type
24 /// that is associated with this future. This is intended to be used in
25 /// "dynamically typed" situations where embedders may not know ahead of time
26 /// the type of a `future` being used by component that is loaded.
27 ///
28 /// # Closing futures
29 ///
30 /// A [`FutureAny`] represents a resource that is owned by a [`Store`]. Proper
31 /// disposal of a future requires invoking the [`FutureAny::close`] method to
32 /// ensure that this handle does not leak. If [`FutureAny::close`] is not
33 /// called then memory will not be leaked once the owning [`Store`] is dropped,
34 /// but the resource handle will be leaked until the [`Store`] is dropped.
35 ///
36 /// [`Store`]: crate::Store
37 #[derive(Debug, Clone, PartialEq)]
38 pub struct FutureAny {
39     id: TableId<TransmitHandle>,
40     ty: PayloadType<FutureType>,
41 }
42 
43 impl FutureAny {
lower_to_index<T>(&self, cx: &mut LowerContext<'_, T>, ty: InterfaceType) -> Result<u32>44     fn lower_to_index<T>(&self, cx: &mut LowerContext<'_, T>, ty: InterfaceType) -> Result<u32> {
45         // Note that unlike `FutureReader<T>` we need to perform an extra
46         // typecheck to ensure that the dynamic type of this future matches
47         // what the guest we're lowering into expects. This couldn't happen
48         // before this point (see the `ComponentType::typecheck` implementation
49         // for this type), so do it now.
50         let future_ty = match ty {
51             InterfaceType::Future(payload) => payload,
52             _ => bad_type_info(),
53         };
54         let payload = cx.types[cx.types[future_ty].ty].payload.as_ref();
55         self.ty.typecheck_guest(
56             &cx.instance_type(),
57             payload,
58             FutureType::equivalent_payload_guest,
59         )?;
60 
61         // Like `FutureReader<T>`, however, lowering "just" gets a u32.
62         futures_and_streams::lower_future_to_index(self.id, cx, ty)
63     }
64 
65     /// Attempts to convert this [`FutureAny`] to a [`FutureReader<T>`]
66     /// with a statically known type.
67     ///
68     /// # Errors
69     ///
70     /// This function will return an error if `T` does not match the type of
71     /// value on this future.
try_into_future_reader<T>(self) -> Result<FutureReader<T>> where T: ComponentType + 'static,72     pub fn try_into_future_reader<T>(self) -> Result<FutureReader<T>>
73     where
74         T: ComponentType + 'static,
75     {
76         self.ty
77             .typecheck_host::<T>(FutureType::equivalent_payload_host::<T>)?;
78         Ok(FutureReader::new_(self.id))
79     }
80 
81     /// Attempts to convert `reader` to a [`FutureAny`], erasing its statically
82     /// known type.
83     ///
84     /// # Errors
85     ///
86     /// This function will return an error if `reader` does not belong to
87     /// `store`.
try_from_future_reader<T>( mut store: impl AsContextMut, reader: FutureReader<T>, ) -> Result<Self> where T: ComponentType + 'static,88     pub fn try_from_future_reader<T>(
89         mut store: impl AsContextMut,
90         reader: FutureReader<T>,
91     ) -> Result<Self>
92     where
93         T: ComponentType + 'static,
94     {
95         let store = store.as_context_mut();
96         let ty = match store.0.transmit_origin(reader.id())? {
97             TransmitOrigin::Host => PayloadType::new_host::<T>(),
98             TransmitOrigin::GuestFuture(id, ty) => PayloadType::new_guest_future(store.0, id, ty),
99             TransmitOrigin::GuestStream(..) => bail!("not a future"),
100         };
101         Ok(FutureAny {
102             id: reader.id(),
103             ty,
104         })
105     }
106 
lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self>107     fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
108         let id = futures_and_streams::lift_index_to_future(cx, ty, index)?;
109         let InterfaceType::Future(ty) = ty else {
110             unreachable!()
111         };
112         let ty = cx.types[ty].ty;
113         Ok(FutureAny {
114             id,
115             // Note that this future might actually be a host-originating
116             // future which means that this ascription of "the type is the
117             // guest" may be slightly in accurate. The guest, however, has the
118             // most accurate view of what type this future has so that should
119             // be reasonable to ascribe as the type here regardless.
120             ty: PayloadType::Guest(FutureType::from(ty, &cx.instance_type())),
121         })
122     }
123 
124     /// Close this `FutureAny`.
125     ///
126     /// This will close this future and cause any write that happens later to
127     /// returned `DROPPED`.
128     ///
129     /// # Errors
130     ///
131     /// Returns an error if this future has already been closed.
132     ///
133     /// # Panics
134     ///
135     /// Panics if the `store` does not own this future.
close(&mut self, mut store: impl AsContextMut) -> Result<()>136     pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
137         futures_and_streams::future_close(store.as_context_mut().0, &mut self.id)
138     }
139 }
140 
141 unsafe impl ComponentType for FutureAny {
142     const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
143 
144     type Lower = <u32 as ComponentType>::Lower;
145 
typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()>146     fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
147         match ty {
148             InterfaceType::Future(_) => Ok(()),
149             other => bail!("expected `future`, found `{}`", desc(other)),
150         }
151     }
152 }
153 
154 unsafe impl Lower for FutureAny {
linear_lower_to_flat<T>( &self, cx: &mut LowerContext<'_, T>, ty: InterfaceType, dst: &mut MaybeUninit<Self::Lower>, ) -> Result<()>155     fn linear_lower_to_flat<T>(
156         &self,
157         cx: &mut LowerContext<'_, T>,
158         ty: InterfaceType,
159         dst: &mut MaybeUninit<Self::Lower>,
160     ) -> Result<()> {
161         self.lower_to_index(cx, ty)?
162             .linear_lower_to_flat(cx, InterfaceType::U32, dst)
163     }
164 
linear_lower_to_memory<T>( &self, cx: &mut LowerContext<'_, T>, ty: InterfaceType, offset: usize, ) -> Result<()>165     fn linear_lower_to_memory<T>(
166         &self,
167         cx: &mut LowerContext<'_, T>,
168         ty: InterfaceType,
169         offset: usize,
170     ) -> Result<()> {
171         self.lower_to_index(cx, ty)?
172             .linear_lower_to_memory(cx, InterfaceType::U32, offset)
173     }
174 }
175 
176 unsafe impl Lift for FutureAny {
linear_lift_from_flat( cx: &mut LiftContext<'_>, ty: InterfaceType, src: &Self::Lower, ) -> Result<Self>177     fn linear_lift_from_flat(
178         cx: &mut LiftContext<'_>,
179         ty: InterfaceType,
180         src: &Self::Lower,
181     ) -> Result<Self> {
182         let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
183         Self::lift_from_index(cx, ty, index)
184     }
185 
linear_lift_from_memory( cx: &mut LiftContext<'_>, ty: InterfaceType, bytes: &[u8], ) -> Result<Self>186     fn linear_lift_from_memory(
187         cx: &mut LiftContext<'_>,
188         ty: InterfaceType,
189         bytes: &[u8],
190     ) -> Result<Self> {
191         let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
192         Self::lift_from_index(cx, ty, index)
193     }
194 }
195 
196 /// Represents a type-erased component model `stream`.
197 ///
198 /// This type is similar to [`ResourceAny`](crate::component::ResourceAny)
199 /// where it's a static guarantee that it represents a component model
200 /// `stream`, but it does not contain any information about the underlying type
201 /// that is associated with this stream. This is intended to be used in
202 /// "dynamically typed" situations where embedders may not know ahead of time
203 /// the type of a `stream` being used by component that is loaded.
204 ///
205 /// # Closing streams
206 ///
207 /// A [`StreamAny`] represents a resource that is owned by a [`Store`]. Proper
208 /// disposal of a stream requires invoking the [`StreamAny::close`] method to
209 /// ensure that this handle does not leak. If [`StreamAny::close`] is not
210 /// called then memory will not be leaked once the owning [`Store`] is dropped,
211 /// but the resource handle will be leaked until the [`Store`] is dropped.
212 ///
213 /// [`Store`]: crate::Store
214 #[derive(Debug, Clone, PartialEq)]
215 pub struct StreamAny {
216     id: TableId<TransmitHandle>,
217     ty: PayloadType<StreamType>,
218 }
219 
220 impl StreamAny {
lower_to_index<T>(&self, cx: &mut LowerContext<'_, T>, ty: InterfaceType) -> Result<u32>221     fn lower_to_index<T>(&self, cx: &mut LowerContext<'_, T>, ty: InterfaceType) -> Result<u32> {
222         // See comments in `FutureAny::lower_to_index` for why this is
223         // different from `StreamReader`'s implementation.
224         let stream_ty = match ty {
225             InterfaceType::Stream(payload) => payload,
226             _ => bad_type_info(),
227         };
228         let payload = cx.types[cx.types[stream_ty].ty].payload.as_ref();
229         self.ty.typecheck_guest(
230             &cx.instance_type(),
231             payload,
232             StreamType::equivalent_payload_guest,
233         )?;
234         futures_and_streams::lower_stream_to_index(self.id, cx, ty)
235     }
236 
237     /// Attempts to convert this [`StreamAny`] to a [`StreamReader<T>`]
238     /// with a statically known type.
239     ///
240     /// # Errors
241     ///
242     /// This function will return an error if `T` does not match the type of
243     /// value on this stream.
try_into_stream_reader<T>(self) -> Result<StreamReader<T>> where T: ComponentType + 'static,244     pub fn try_into_stream_reader<T>(self) -> Result<StreamReader<T>>
245     where
246         T: ComponentType + 'static,
247     {
248         self.ty
249             .typecheck_host::<T>(StreamType::equivalent_payload_host::<T>)?;
250         Ok(StreamReader::new_(self.id))
251     }
252 
253     /// Attempts to convert `reader` to a [`StreamAny`], erasing its statically
254     /// known type.
255     ///
256     /// # Errors
257     ///
258     /// This function will return an error if `reader` does not belong to
259     /// `store`.
try_from_stream_reader<T>( mut store: impl AsContextMut, reader: StreamReader<T>, ) -> Result<Self> where T: ComponentType + 'static,260     pub fn try_from_stream_reader<T>(
261         mut store: impl AsContextMut,
262         reader: StreamReader<T>,
263     ) -> Result<Self>
264     where
265         T: ComponentType + 'static,
266     {
267         let store = store.as_context_mut();
268         let ty = match store.0.transmit_origin(reader.id())? {
269             TransmitOrigin::Host => PayloadType::new_host::<T>(),
270             TransmitOrigin::GuestStream(id, ty) => PayloadType::new_guest_stream(store.0, id, ty),
271             TransmitOrigin::GuestFuture(..) => bail!("not a stream"),
272         };
273         Ok(StreamAny {
274             id: reader.id(),
275             ty,
276         })
277     }
278 
lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self>279     fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
280         let id = futures_and_streams::lift_index_to_stream(cx, ty, index)?;
281         let InterfaceType::Stream(ty) = ty else {
282             unreachable!()
283         };
284         let ty = cx.types[ty].ty;
285         Ok(StreamAny {
286             id,
287             // Note that this stream might actually be a host-originating, but
288             // see the documentation in `FutureAny::lift_from_index` for why
289             // this should be ok.
290             ty: PayloadType::Guest(StreamType::from(ty, &cx.instance_type())),
291         })
292     }
293 
294     /// Close this `StreamAny`.
295     ///
296     /// This will close this stream and cause any write that happens later to
297     /// returned `DROPPED`.
298     ///
299     /// # Errors
300     ///
301     /// Returns an error if this stream has already been closed.
302     ///
303     /// # Panics
304     ///
305     /// Panics if the `store` does not own this stream.
close(&mut self, mut store: impl AsContextMut) -> Result<()>306     pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
307         futures_and_streams::stream_close(store.as_context_mut().0, &mut self.id)
308     }
309 }
310 
311 unsafe impl ComponentType for StreamAny {
312     const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
313 
314     type Lower = <u32 as ComponentType>::Lower;
315 
typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()>316     fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
317         match ty {
318             InterfaceType::Stream(_) => Ok(()),
319             other => bail!("expected `stream`, found `{}`", desc(other)),
320         }
321     }
322 }
323 
324 unsafe impl Lower for StreamAny {
linear_lower_to_flat<T>( &self, cx: &mut LowerContext<'_, T>, ty: InterfaceType, dst: &mut MaybeUninit<Self::Lower>, ) -> Result<()>325     fn linear_lower_to_flat<T>(
326         &self,
327         cx: &mut LowerContext<'_, T>,
328         ty: InterfaceType,
329         dst: &mut MaybeUninit<Self::Lower>,
330     ) -> Result<()> {
331         self.lower_to_index(cx, ty)?
332             .linear_lower_to_flat(cx, InterfaceType::U32, dst)
333     }
334 
linear_lower_to_memory<T>( &self, cx: &mut LowerContext<'_, T>, ty: InterfaceType, offset: usize, ) -> Result<()>335     fn linear_lower_to_memory<T>(
336         &self,
337         cx: &mut LowerContext<'_, T>,
338         ty: InterfaceType,
339         offset: usize,
340     ) -> Result<()> {
341         self.lower_to_index(cx, ty)?
342             .linear_lower_to_memory(cx, InterfaceType::U32, offset)
343     }
344 }
345 
346 unsafe impl Lift for StreamAny {
linear_lift_from_flat( cx: &mut LiftContext<'_>, ty: InterfaceType, src: &Self::Lower, ) -> Result<Self>347     fn linear_lift_from_flat(
348         cx: &mut LiftContext<'_>,
349         ty: InterfaceType,
350         src: &Self::Lower,
351     ) -> Result<Self> {
352         let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
353         Self::lift_from_index(cx, ty, index)
354     }
355 
linear_lift_from_memory( cx: &mut LiftContext<'_>, ty: InterfaceType, bytes: &[u8], ) -> Result<Self>356     fn linear_lift_from_memory(
357         cx: &mut LiftContext<'_>,
358         ty: InterfaceType,
359         bytes: &[u8],
360     ) -> Result<Self> {
361         let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
362         Self::lift_from_index(cx, ty, index)
363     }
364 }
365 
366 #[derive(Debug, Clone)]
367 enum PayloadType<T> {
368     Guest(T),
369     Host {
370         id: TypeId,
371         typecheck: fn(Option<&InterfaceType>, &InstanceType<'_>) -> Result<()>,
372     },
373 }
374 
375 impl<T: PartialEq> PartialEq for PayloadType<T> {
eq(&self, other: &Self) -> bool376     fn eq(&self, other: &Self) -> bool {
377         match (self, other) {
378             (PayloadType::Guest(a), PayloadType::Guest(b)) => a == b,
379             (PayloadType::Guest(_), _) => false,
380             (PayloadType::Host { id: a_id, .. }, PayloadType::Host { id: b_id, .. }) => {
381                 a_id == b_id
382             }
383             (PayloadType::Host { .. }, _) => false,
384         }
385     }
386 }
387 
388 impl PayloadType<FutureType> {
new_guest_future( store: &StoreOpaque, id: ComponentInstanceId, ty: TypeFutureTableIndex, ) -> Self389     fn new_guest_future(
390         store: &StoreOpaque,
391         id: ComponentInstanceId,
392         ty: TypeFutureTableIndex,
393     ) -> Self {
394         let types = InstanceType::new(&store.component_instance(id));
395         let ty = types.types[ty].ty;
396         PayloadType::Guest(FutureType::from(ty, &types))
397     }
398 }
399 
400 impl PayloadType<StreamType> {
new_guest_stream( store: &StoreOpaque, id: ComponentInstanceId, ty: TypeStreamTableIndex, ) -> Self401     fn new_guest_stream(
402         store: &StoreOpaque,
403         id: ComponentInstanceId,
404         ty: TypeStreamTableIndex,
405     ) -> Self {
406         let types = InstanceType::new(&store.component_instance(id));
407         let ty = types.types[ty].ty;
408         PayloadType::Guest(StreamType::from(ty, &types))
409     }
410 }
411 
412 impl<T> PayloadType<T> {
new_host<P>() -> Self where P: ComponentType + 'static,413     fn new_host<P>() -> Self
414     where
415         P: ComponentType + 'static,
416     {
417         PayloadType::Host {
418             typecheck: types::typecheck_payload::<P>,
419             id: TypeId::of::<P>(),
420         }
421     }
422 
typecheck_guest( &self, types: &InstanceType<'_>, payload: Option<&InterfaceType>, equivalent: fn(&T, &InstanceType<'_>, Option<&InterfaceType>) -> bool, ) -> Result<()>423     fn typecheck_guest(
424         &self,
425         types: &InstanceType<'_>,
426         payload: Option<&InterfaceType>,
427         equivalent: fn(&T, &InstanceType<'_>, Option<&InterfaceType>) -> bool,
428     ) -> Result<()> {
429         match self {
430             Self::Guest(ty) => {
431                 if equivalent(ty, types, payload) {
432                     Ok(())
433                 } else {
434                     bail!("future payload types differ")
435                 }
436             }
437             Self::Host { typecheck, .. } => {
438                 typecheck(payload, types).context("future payload types differ")
439             }
440         }
441     }
442 
typecheck_host<P>(&self, equivalent: fn(&T) -> Result<()>) -> Result<()> where P: ComponentType + 'static,443     fn typecheck_host<P>(&self, equivalent: fn(&T) -> Result<()>) -> Result<()>
444     where
445         P: ComponentType + 'static,
446     {
447         match self {
448             Self::Guest(ty) => equivalent(ty),
449             Self::Host { id, .. } => {
450                 if *id == TypeId::of::<P>() {
451                     Ok(())
452                 } else {
453                     bail!("future payload types differ")
454                 }
455             }
456         }
457     }
458 }
459