aboutsummaryrefslogtreecommitdiffhomepage
path: root/crates/shirabe/src/util/process_executor.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/shirabe/src/util/process_executor.rs')
-rw-r--r--crates/shirabe/src/util/process_executor.rs91
1 files changed, 51 insertions, 40 deletions
diff --git a/crates/shirabe/src/util/process_executor.rs b/crates/shirabe/src/util/process_executor.rs
index 1ee821c..74fc30c 100644
--- a/crates/shirabe/src/util/process_executor.rs
+++ b/crates/shirabe/src/util/process_executor.rs
@@ -53,8 +53,7 @@ struct Job {
command: PhpMixed,
cwd: Option<String>,
process: Option<Process>,
- resolve: Option<Box<dyn Fn(PhpMixed) + Send + Sync>>,
- reject: Option<Box<dyn Fn(PhpMixed) + Send + Sync>>,
+ exception: Option<anyhow::Error>,
}
impl std::fmt::Debug for Job {
@@ -396,32 +395,30 @@ impl ProcessExecutor {
command,
cwd: cwd_opt,
process: None,
- resolve: None,
- reject: None,
+ exception: None,
};
- // TODO(phase-b): build resolver/canceler closures bound to &mut self.jobs
- let resolver: Box<dyn Fn(Option<PhpMixed>, Option<PhpMixed>)> =
- Box::new(|_resolve, _reject| {});
- let canceler: Box<dyn Fn()> = Box::new(|| {
- if defined("SIGINT") {
- // job.process.signal(SIGINT)
- }
- // job.process.stop(1)
- });
- let _ = (resolver, canceler);
-
- let promise = Promise::new(Box::new(|_resolve, _reject| {}));
- // TODO(phase-b): wire promise.then() side-effects: mark job done & update status
- let promise: Box<dyn PromiseInterface> = Box::new(promise);
-
self.jobs.insert(id, job);
if self.running_jobs < self.max_jobs {
self.start_job(id);
}
- Ok(promise)
+ // Drive the job to completion (serial pump). PHP resolves the promise with the Process
+ // once it stops running; here we await by pumping count_active_jobs and then hand back the
+ // Process (or the rejection captured during start_job).
+ self.wait_id(Some(id))?;
+
+ let mut job = self.jobs.shift_remove(&id).unwrap();
+ if let Some(process) = job.process.take() {
+ Ok(process)
+ } else if let Some(e) = job.exception.take() {
+ Err(e)
+ } else {
+ Err(anyhow::anyhow!(
+ "ProcessExecutor async job completed without a process"
+ ))
+ }
}
fn output_handler(&mut self, r#type: &str, buffer: &str) {
@@ -496,8 +493,13 @@ impl ProcessExecutor {
})();
let process = match process_result {
Ok(p) => p,
- Err(_e) => {
- // job.reject(e) — TODO(phase-b)
+ Err(e) => {
+ // PHP: $job['reject']($e) — record the rejection and settle the job as failed.
+ if let Some(job) = self.jobs.get_mut(&id) {
+ job.status = Self::STATUS_FAILED;
+ job.exception = Some(e);
+ }
+ self.mark_job_done();
return;
}
};
@@ -544,7 +546,7 @@ impl ProcessExecutor {
pub fn wait_id(&mut self, index: Option<i64>) -> Result<()> {
loop {
- if 0 == self.count_active_jobs(index) {
+ if 0 == self.count_active_jobs(index)? {
return Ok(());
}
@@ -558,7 +560,7 @@ impl ProcessExecutor {
}
/// @internal
- pub fn count_active_jobs(&mut self, index: Option<i64>) -> i64 {
+ pub fn count_active_jobs(&mut self, index: Option<i64>) -> Result<i64> {
// tick
let ids: Vec<i64> = self.jobs.keys().copied().collect();
for id in &ids {
@@ -575,18 +577,26 @@ impl ProcessExecutor {
.map(|p| p.is_running())
.unwrap_or(false);
if !is_running {
- if let Some(job) = self.jobs.get(id) {
- if let Some(resolve) = job.resolve.as_ref() {
- let process_mixed = PhpMixed::Null; // TODO(phase-b): wrap Process as PhpMixed
- resolve(process_mixed);
- }
+ // PHP: call_user_func($job['resolve'], $job['process']) — the .then handler
+ // marks the job completed/failed based on the process exit status.
+ let successful = self
+ .jobs
+ .get(id)
+ .and_then(|j| j.process.as_ref())
+ .map(|p| p.is_successful())
+ .unwrap_or(false);
+ if let Some(job) = self.jobs.get_mut(id) {
+ job.status = if successful {
+ Self::STATUS_COMPLETED
+ } else {
+ Self::STATUS_FAILED
+ };
}
+ self.mark_job_done();
}
- if let Some(job) = self.jobs.get_mut(id) {
- if let Some(p) = job.process.as_mut() {
- p.check_timeout();
- }
+ if let Some(p) = self.jobs.get(id).and_then(|j| j.process.as_ref()) {
+ p.check_timeout()?;
}
}
}
@@ -600,12 +610,13 @@ impl ProcessExecutor {
}
if let Some(index) = index {
- return if self.jobs.get(&index).map(|j| j.status).unwrap_or(0) < Self::STATUS_COMPLETED
- {
- 1
- } else {
- 0
- };
+ return Ok(
+ if self.jobs.get(&index).map(|j| j.status).unwrap_or(0) < Self::STATUS_COMPLETED {
+ 1
+ } else {
+ 0
+ },
+ );
}
let mut active: i64 = 0;
@@ -619,7 +630,7 @@ impl ProcessExecutor {
}
}
- active
+ Ok(active)
}
fn mark_job_done(&mut self) {