aboutsummaryrefslogtreecommitdiff
path: root/src/actor.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/actor.rs')
-rw-r--r--src/actor.rs49
1 files changed, 28 insertions, 21 deletions
diff --git a/src/actor.rs b/src/actor.rs
index e83c003..12e351f 100644
--- a/src/actor.rs
+++ b/src/actor.rs
@@ -1,5 +1,5 @@
use std::collections::HashMap;
-use std::sync::mpsc::{Receiver, Sender};
+use std::sync::mpsc::{Receiver, SyncSender};
use std::thread;
use uuid::Uuid;
@@ -44,7 +44,8 @@ pub struct Slot {
pub trait Actorful {
fn inputs(&self) -> Vec<Slot>;
fn outputs(&self) -> Vec<Slot>;
- fn launch(&self, input_channels: HashMap<String, Receiver<Value>>, output_channels: HashMap<String, Sender<Value>>, world: &mut World);
+ fn launch(&self, input_channels: HashMap<String, Receiver<Value>>,
+ output_channels: HashMap<String, SyncSender<Value>>, world: &mut World) -> Box<dyn FnOnce() + Send>;
fn boxed_clone(&self) -> Box<dyn Actorful + Send>;
}
@@ -89,27 +90,33 @@ impl Actorful for ProgrammableActor {
self.outputs.clone()
}
- fn launch(&self, mut input_channels: HashMap<String, Receiver<Value>>, mut output_channels: HashMap<String, Sender<Value>>, world: &mut World) {
- let mut child_channels = HashMap::new();
+ fn launch(&self, mut input_channels: HashMap<String, Receiver<Value>>,
+ mut output_channels: HashMap<String, SyncSender<Value>>, world: &mut World) -> Box<dyn FnOnce() + Send> {
+ let mut child_inputs = HashMap::new();
+ let mut child_outputs = HashMap::new();
for (&child_id, child_type) in &self.children {
- let (_, child_inputs, child_outputs) = world.spawn_actor(child_type);
- child_channels.insert(child_id, (child_inputs, child_outputs));
- }
- for (source, dest) in &self.cables {
- let source = match source {
- ProducerSlotID::Input(name) => input_channels.remove(name).unwrap(),
- ProducerSlotID::ChildOutput(id, name) => child_channels[id].1.remove(name).unwrap()
- };
- let dest = match dest {
- ConsumerSlotID::Output(name) => output_channels.remove(name).unwrap(),
- ConsumerSlotID::ChildInput(id, name) => child_channels[id].0.remove(name).unwrap()
- };
- thread::spawn(move || {
- for val in source.iter() {
- dest.send(val).unwrap()
- }
- });
+ let (_, my_inputs, my_outputs) = world.spawn_actor(child_type);
+ child_inputs.extend(my_inputs.into_iter().map(|(name, sender)| ((child_id, name), sender)));
+ child_outputs.extend(my_outputs.into_iter().map(|(name, receiver)| ((child_id, name), receiver)));
}
+ let cables = self.cables.clone();
+ Box::new(move || {
+ for (source, dest) in cables {
+ let source = match source {
+ ProducerSlotID::Input(name) => input_channels.remove(&name).unwrap(),
+ ProducerSlotID::ChildOutput(id, name) => child_outputs.remove(&(id, name)).unwrap()
+ };
+ let dest = match dest {
+ ConsumerSlotID::Output(name) => output_channels.remove(&name).unwrap(),
+ ConsumerSlotID::ChildInput(id, name) => child_inputs.remove(&(id, name)).unwrap()
+ };
+ thread::spawn(move || {
+ for val in source.iter() {
+ dest.send(val).unwrap()
+ }
+ });
+ }
+ })
}
fn boxed_clone(&self) -> Box<dyn Actorful + Send> {