aboutsummaryrefslogtreecommitdiffhomepage
path: root/crates/shirabe/src/util
diff options
context:
space:
mode:
authornsfisis <nsfisis@gmail.com>2026-05-23 15:45:33 +0900
committernsfisis <nsfisis@gmail.com>2026-05-23 15:48:00 +0900
commitbd6d0186d2c01a3e1d6324ad5a0bcdd71de53098 (patch)
tree939eb1dccbfb3341a2f618e734ca23ef84a8e5cc /crates/shirabe/src/util
parente068a9d644fde6659a88accd55b3f1d0d9d7cf46 (diff)
downloadphp-shirabe-bd6d0186d2c01a3e1d6324ad5a0bcdd71de53098.tar.gz
php-shirabe-bd6d0186d2c01a3e1d6324ad5a0bcdd71de53098.tar.zst
php-shirabe-bd6d0186d2c01a3e1d6324ad5a0bcdd71de53098.zip
refactor(promise): drop \React\Promise
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Diffstat (limited to 'crates/shirabe/src/util')
-rw-r--r--crates/shirabe/src/util/filesystem.rs51
-rw-r--r--crates/shirabe/src/util/http/curl_downloader.rs1
-rw-r--r--crates/shirabe/src/util/http_downloader.rs267
-rw-r--r--crates/shirabe/src/util/loop.rs90
-rw-r--r--crates/shirabe/src/util/process_executor.rs91
-rw-r--r--crates/shirabe/src/util/sync_helper.rs52
6 files changed, 322 insertions, 230 deletions
diff --git a/crates/shirabe/src/util/filesystem.rs b/crates/shirabe/src/util/filesystem.rs
index 1bf4673..73f21d6 100644
--- a/crates/shirabe/src/util/filesystem.rs
+++ b/crates/shirabe/src/util/filesystem.rs
@@ -149,43 +149,26 @@ impl Filesystem {
vec!["rm".to_string(), "-rf".to_string(), directory.to_string()]
};
- // TODO(phase-c-promise): execute_async is now async fn -> Result<Process>; the .then_boxed continuation that
- // inspects the process result and flattens into a recursive removeDirectoryPhp fallback needs job-machine
- // boundary design before it can become a flat await chain.
- let promise = self.get_process().execute_async(
- PhpMixed::List(
- cmd.iter()
- .map(|s| Box::new(PhpMixed::String(s.clone())))
- .collect(),
- ),
- (),
- )?;
+ let process = self
+ .get_process()
+ .execute_async(
+ PhpMixed::List(
+ cmd.iter()
+ .map(|s| Box::new(PhpMixed::String(s.clone())))
+ .collect(),
+ ),
+ (),
+ )
+ .await?;
- let directory_owned = directory.to_string();
- // TODO(plugin): closure capture of $this in PHP — port wires the same logic via a callback handle.
- Ok(promise.then_boxed(
- Some(Box::new(
- move |process: PhpMixed| -> Box<dyn PromiseInterface> {
- // clear stat cache because external processes aren't tracked by the php stat cache
- clearstatcache2(false, "");
+ // clear stat cache because external processes aren't tracked by the php stat cache
+ clearstatcache2(false, "");
- // TODO(phase-b): ArrayObject has no call_method; PHP-side calls $process->isSuccessful().
- let is_successful = matches!(process, PhpMixed::Bool(true));
- if is_successful && !is_dir(&directory_owned) {
- return shirabe_external_packages::react::promise::resolve(Some(
- PhpMixed::Bool(true),
- ));
- }
+ if process.is_successful() && !is_dir(directory) {
+ return Ok(true);
+ }
- // PHP: \React\Promise\resolve($this->removeDirectoryPhp($directory))
- // The recursive PHP call doesn't have a clean async equivalent; we resort to a sync call.
- let mut fs = Filesystem::new(None);
- let res = fs.remove_directory_php(&directory_owned).unwrap_or(false);
- shirabe_external_packages::react::promise::resolve(Some(PhpMixed::Bool(res)))
- },
- )),
- None,
- ))
+ self.remove_directory_php(directory)
}
/// Returns null when no edge case was hit. Otherwise a bool whether removal was successful
diff --git a/crates/shirabe/src/util/http/curl_downloader.rs b/crates/shirabe/src/util/http/curl_downloader.rs
index bd6817c..c3a1227 100644
--- a/crates/shirabe/src/util/http/curl_downloader.rs
+++ b/crates/shirabe/src/util/http/curl_downloader.rs
@@ -36,7 +36,6 @@ use crate::util::Url;
use crate::util::http::CurlResponse;
use crate::util::http::ProxyManager;
use crate::util::{AuthHelper, PromptAuthResult, StoreAuth};
-// use shirabe_external_packages::react::promise::Promise; // typehint only in PHP
/// @phpstan-type Attributes array{retryAuthFailure: bool, redirects: int<0, max>, retries: int<0, max>, storeAuth: 'prompt'|bool, ipResolve: 4|6|null}
/// @phpstan-type Job array{url: non-empty-string, origin: string, attributes: Attributes, options: mixed[], progress: mixed[], curlHandle: \CurlHandle, filename: string|null, headerHandle: resource, bodyHandle: resource, resolve: callable, reject: callable, primaryIp: string}
diff --git a/crates/shirabe/src/util/http_downloader.rs b/crates/shirabe/src/util/http_downloader.rs
index 6f769ce..b1cf921 100644
--- a/crates/shirabe/src/util/http_downloader.rs
+++ b/crates/shirabe/src/util/http_downloader.rs
@@ -16,9 +16,9 @@ use crate::composer;
use crate::composer::ComposerHandle;
use crate::config::Config;
use crate::downloader::TransportException;
-use crate::exception::IrrecoverableDownloadException;
use crate::io::IOInterface;
use crate::package::version::VersionParser;
+use crate::util::GetResult;
use crate::util::Platform;
use crate::util::RemoteFilesystem;
use crate::util::StreamContextFactory;
@@ -60,11 +60,13 @@ struct Job {
request: Request,
sync: bool,
origin: String,
- resolve: Option<Box<dyn Fn(PhpMixed) + Send + Sync>>,
- reject: Option<Box<dyn Fn(PhpMixed) + Send + Sync>>,
curl_id: Option<i64>,
response: Option<Response>,
exception: Option<anyhow::Error>,
+ /// Completion slot written by the curl resolve/reject closures (driven by `curl.tick()`)
+ /// and read by `count_active_jobs`. Uses `Arc<Mutex>` because `CurlDownloader::download`
+ /// requires `Send + Sync` callbacks.
+ settled: std::sync::Arc<std::sync::Mutex<Option<anyhow::Result<Response>>>>,
}
impl std::fmt::Debug for Job {
@@ -178,7 +180,7 @@ impl HttpDownloader {
}
.into());
}
- let (job, promise) = self.add_job(
+ let job = self.add_job(
Request {
url: url.to_string(),
options,
@@ -186,13 +188,6 @@ impl HttpDownloader {
},
true,
)?;
- promise.then_with(
- None,
- Some(Box::new(|_e: PhpMixed| {
- // suppress error as it is rethrown to the caller by getResponse() a few lines below
- PhpMixed::Null
- })),
- );
self.wait_id(Some(job.id))?;
let response = self.get_response(job.id)?;
@@ -213,7 +208,7 @@ impl HttpDownloader {
}
.into());
}
- let (_, promise) = self.add_job(
+ let job = self.add_job(
Request {
url: url.to_string(),
options,
@@ -221,8 +216,9 @@ impl HttpDownloader {
},
false,
)?;
+ self.wait_id(Some(job.id))?;
- Ok(promise)
+ self.get_response(job.id)
}
/// Copy a file synchronously
@@ -239,7 +235,7 @@ impl HttpDownloader {
}
.into());
}
- let (job, _) = self.add_job(
+ let job = self.add_job(
Request {
url: url.to_string(),
options,
@@ -266,7 +262,7 @@ impl HttpDownloader {
}
.into());
}
- let (_, promise) = self.add_job(
+ let job = self.add_job(
Request {
url: url.to_string(),
options,
@@ -274,8 +270,9 @@ impl HttpDownloader {
},
false,
)?;
+ self.wait_id(Some(job.id))?;
- Ok(promise)
+ self.get_response(job.id)
}
/// Retrieve the options set in the constructor
@@ -289,27 +286,17 @@ impl HttpDownloader {
}
/// @phpstan-param Request $request
- /// @return array{Job, PromiseInterface}
- async fn add_job(&mut self, mut request: Request, sync: bool) -> Result<(JobHandle, Response)> {
+ ///
+ /// Queues a job and starts it if there is capacity. Mirrors PHP `addJob`: for non-curl (rfs)
+ /// jobs the work runs synchronously here (PHP runs it in the Promise resolver during
+ /// construction); for curl jobs the work is driven later by `start_job` / `count_active_jobs`.
+ fn add_job(&mut self, mut request: Request, sync: bool) -> Result<JobHandle> {
request.options = array_replace_recursive(self.options.clone(), request.options);
let id = self.id_gen;
self.id_gen += 1;
let origin = Url::get_origin(&*self.config.borrow(), &request.url);
- let job = Job {
- id,
- status: Self::STATUS_QUEUED,
- request: request.clone(),
- sync,
- origin: origin.clone(),
- resolve: None,
- reject: None,
- curl_id: None,
- response: None,
- exception: None,
- };
-
if !sync && !self.allow_async {
return Err(LogicException {
message:
@@ -346,34 +333,124 @@ impl HttpDownloader {
);
}
- // TODO(phase-b): build resolver/canceler closures bound to &mut self.jobs; needs Rc<RefCell> wiring
- let _ = (&self.rfs, &self.curl);
-
- let resolver: Box<dyn Fn(Box<dyn Fn(PhpMixed)>, Box<dyn Fn(PhpMixed)>)> =
- Box::new(|_resolve, _reject| {
- // TODO(phase-b)
- });
- let canceler: Box<dyn Fn()> = Box::new(|| {
- // PHP canceler logic — TODO(phase-b)
- let _ = IrrecoverableDownloadException(shirabe_php_shim::RuntimeException {
- message: "Download canceled".to_string(),
- code: 0,
- });
- let _ = Url::sanitize(String::new());
- });
- let _ = (resolver, canceler);
-
- let promise = Promise::new(Box::new(|_resolve, _reject| {}));
- // TODO(phase-b): wire promise.then() side-effects: mark job done & store response/exception
- let promise: Box<dyn PromiseInterface> = Box::new(promise);
-
+ let job = Job {
+ id,
+ status: Self::STATUS_QUEUED,
+ request: request.clone(),
+ sync,
+ origin,
+ curl_id: None,
+ response: None,
+ exception: None,
+ settled: std::sync::Arc::new(std::sync::Mutex::new(None)),
+ };
+ let can_use_curl = self.can_use_curl(&job);
self.jobs.insert(id, job);
+ // PHP runs the resolver synchronously while constructing the Promise. For non-curl jobs
+ // the resolver performs the blocking RemoteFilesystem download and resolves immediately.
+ if !can_use_curl {
+ self.run_rfs_job(id);
+ }
+
if self.running_jobs < self.max_jobs {
self.start_job(id);
}
- Ok((JobHandle { id }, promise))
+ Ok(JobHandle { id })
+ }
+
+ /// Mirrors the non-curl branch of PHP `addJob`'s Promise resolver plus the `.then` side
+ /// effects: performs the blocking RemoteFilesystem download and settles the job.
+ fn run_rfs_job(&mut self, id: i64) {
+ let (request, origin, url, options, copy_to) = {
+ let job = self.jobs.get(&id).unwrap();
+ (
+ job.request.clone(),
+ job.origin.clone(),
+ job.request.url.clone(),
+ job.request.options.clone(),
+ job.request.copy_to.clone(),
+ )
+ };
+
+ if let Some(job) = self.jobs.get_mut(&id) {
+ job.status = Self::STATUS_STARTED;
+ }
+
+ let result: anyhow::Result<Response> = {
+ let rfs = self.rfs.as_mut().unwrap();
+ (|| -> anyhow::Result<Response> {
+ if let Some(copy_to) = copy_to.as_deref() {
+ rfs.copy(&origin, &url, copy_to, false, options.clone())?;
+
+ let headers = rfs.get_last_headers().to_vec();
+ let code = RemoteFilesystem::find_status_code(&headers);
+ let body = Some(format!("{}~", copy_to));
+ match Response::new(Self::request_to_map(&request), code, headers, body)? {
+ Ok(r) => Ok(r),
+ Err(e) => Err(e.into()),
+ }
+ } else {
+ let body = match rfs.get_contents(&origin, &url, false, options.clone())? {
+ GetResult::Content(s) => Some(s),
+ _ => None,
+ };
+ let headers = rfs.get_last_headers().to_vec();
+ let code = RemoteFilesystem::find_status_code(&headers);
+ match Response::new(Self::request_to_map(&request), code, headers, body)? {
+ Ok(r) => Ok(r),
+ Err(e) => Err(e.into()),
+ }
+ }
+ })()
+ };
+
+ self.settle_job(id, result);
+ }
+
+ /// PHP `new Response($job['request'], ...)` is fed the whole request array; reproduce it.
+ fn request_to_map(request: &Request) -> IndexMap<String, PhpMixed> {
+ let mut m: IndexMap<String, PhpMixed> = IndexMap::new();
+ m.insert("url".to_string(), PhpMixed::String(request.url.clone()));
+ m.insert(
+ "options".to_string(),
+ PhpMixed::Array(
+ request
+ .options
+ .iter()
+ .map(|(k, v)| (k.clone(), Box::new(v.clone())))
+ .collect(),
+ ),
+ );
+ m.insert(
+ "copyTo".to_string(),
+ match &request.copy_to {
+ Some(s) => PhpMixed::String(s.clone()),
+ None => PhpMixed::Null,
+ },
+ );
+ m
+ }
+
+ /// Applies the effect of PHP's promise `.then` handlers: records the response/exception,
+ /// transitions the job status and decrements the running-job counter.
+ fn settle_job(&mut self, id: i64, result: anyhow::Result<Response>) {
+ match result {
+ Ok(response) => {
+ if let Some(job) = self.jobs.get_mut(&id) {
+ job.status = Self::STATUS_COMPLETED;
+ job.response = Some(response);
+ }
+ }
+ Err(e) => {
+ if let Some(job) = self.jobs.get_mut(&id) {
+ job.status = Self::STATUS_FAILED;
+ job.exception = Some(e);
+ }
+ }
+ }
+ self.mark_job_done();
}
fn start_job(&mut self, id: i64) {
@@ -398,7 +475,6 @@ impl HttpDownloader {
};
let url = request.url.clone();
let options = request.options.clone();
- let _ = origin;
if self.disabled {
let has_if_modified_since = {
@@ -433,8 +509,13 @@ impl HttpDownloader {
if has_if_modified_since {
let mut req_map: IndexMap<String, PhpMixed> = IndexMap::new();
req_map.insert("url".to_string(), PhpMixed::String(url.clone()));
- let _ = Response::new(req_map, Some(304), Vec::new(), Some(String::new()));
- // job.resolve(response) — TODO(phase-b)
+ let response =
+ match Response::new(req_map, Some(304), Vec::new(), Some(String::new())) {
+ Ok(Ok(r)) => Ok(r),
+ Ok(Err(e)) => Err(e.into()),
+ Err(e) => Err(e),
+ };
+ self.settle_job(id, response);
} else {
let mut e = TransportException::new(
format!(
@@ -444,15 +525,45 @@ impl HttpDownloader {
499,
);
e.set_status_code(Some(499));
- // job.reject(e) — TODO(phase-b)
- let _ = e;
+ self.settle_job(id, Err(e.into()));
}
return;
}
- let _ = copy_to;
- // TODO(phase-b): try { curl->download(...) } catch (...) { reject(e) }
+ // curl branch: register the request with the curl multi handle. Completion is delivered
+ // asynchronously by curl.tick() into the job's `settled` slot (read by count_active_jobs).
+ // PHP catches any exception from download() and rejects the job.
+ let settled = self.jobs.get(&id).unwrap().settled.clone();
+ let settled_for_reject = settled.clone();
+ let resolve: Box<dyn Fn(PhpMixed) + Send + Sync> = Box::new(move |_response: PhpMixed| {
+ // TODO(phase-c-promise): curl.tick() delivers the response as PhpMixed here; convert it
+ // into a Response and store Ok(..) in `settled`. Bottoms at the todo!() curl I/O.
+ let _ = &settled;
+ });
+ let reject: Box<dyn Fn(PhpMixed) + Send + Sync> = Box::new(move |_error: PhpMixed| {
+ // TODO(phase-c-promise): convert the PhpMixed error into anyhow::Error and store
+ // Err(..) in `settled`. Bottoms at the todo!() curl I/O.
+ let _ = &settled_for_reject;
+ });
+
+ let download_result = {
+ let curl = self.curl.as_mut().unwrap();
+ curl.download(resolve, reject, &origin, &url, options, copy_to.as_deref())
+ };
+ match download_result {
+ Ok(Ok(curl_id)) => {
+ if let Some(job) = self.jobs.get_mut(&id) {
+ job.curl_id = Some(curl_id);
+ }
+ }
+ Ok(Err(e)) => {
+ self.settle_job(id, Err(e.into()));
+ }
+ Err(e) => {
+ self.settle_job(id, Err(e));
+ }
+ }
}
fn mark_job_done(&mut self) {
@@ -468,7 +579,7 @@ impl HttpDownloader {
fn wait_id(&mut self, index: Option<i64>) -> Result<()> {
loop {
- let job_count = self.count_active_jobs(index);
+ let job_count = self.count_active_jobs(index)?;
if job_count == 0 {
break;
}
@@ -482,7 +593,7 @@ impl HttpDownloader {
}
/// @internal
- pub fn count_active_jobs(&mut self, index: Option<i64>) -> i64 {
+ pub fn count_active_jobs(&mut self, index: Option<i64>) -> Result<i64> {
if self.running_jobs < self.max_jobs {
let queued_ids: Vec<i64> = self
.jobs
@@ -499,16 +610,32 @@ impl HttpDownloader {
}
if let Some(curl) = self.curl.as_mut() {
- curl.tick();
+ curl.tick()?;
+ }
+
+ // Apply completions delivered by curl.tick() into each started job's `settled` slot.
+ // This reproduces the effect of PHP's resolve/reject callbacks firing during tick().
+ let started_ids: Vec<i64> = self
+ .jobs
+ .values()
+ .filter(|j| j.status == Self::STATUS_STARTED)
+ .map(|j| j.id)
+ .collect();
+ for id in started_ids {
+ let settled = self.jobs.get(&id).unwrap().settled.lock().unwrap().take();
+ if let Some(result) = settled {
+ self.settle_job(id, result);
+ }
}
if let Some(index) = index {
- return if self.jobs.get(&index).map(|j| j.status).unwrap_or(0) < Self::STATUS_COMPLETED
- {
- 1
- } else {
- 0
- };
+ return Ok(
+ if self.jobs.get(&index).map(|j| j.status).unwrap_or(0) < Self::STATUS_COMPLETED {
+ 1
+ } else {
+ 0
+ },
+ );
}
let mut active: i64 = 0;
@@ -525,7 +652,7 @@ impl HttpDownloader {
}
}
- active
+ Ok(active)
}
/// @param int $index Job id
diff --git a/crates/shirabe/src/util/loop.rs b/crates/shirabe/src/util/loop.rs
index 18f2359..5512211 100644
--- a/crates/shirabe/src/util/loop.rs
+++ b/crates/shirabe/src/util/loop.rs
@@ -3,25 +3,12 @@
use crate::util::HttpDownloader;
use crate::util::ProcessExecutor;
use anyhow::Result;
-use indexmap::IndexMap;
use shirabe_external_packages::symfony::component::console::helper::ProgressBar;
-use shirabe_php_shim::microtime;
+#[derive(Debug)]
pub struct Loop {
http_downloader: std::rc::Rc<std::cell::RefCell<HttpDownloader>>,
process_executor: Option<std::rc::Rc<std::cell::RefCell<ProcessExecutor>>>,
- current_promises: IndexMap<i64, Vec<Box<dyn PromiseInterface>>>,
- wait_index: i64,
-}
-
-impl std::fmt::Debug for Loop {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("Loop")
- .field("http_downloader", &self.http_downloader)
- .field("process_executor", &self.process_executor)
- .field("wait_index", &self.wait_index)
- .finish()
- }
}
impl Loop {
@@ -39,8 +26,6 @@ impl Loop {
Self {
http_downloader,
process_executor,
- current_promises: IndexMap::new(),
- wait_index: 0,
}
}
@@ -54,65 +39,25 @@ impl Loop {
self.process_executor.as_ref()
}
- pub fn wait(
+ pub async fn wait<'p>(
&mut self,
- promises: Vec<Box<dyn PromiseInterface>>,
- mut progress: Option<&mut ProgressBar>,
+ promises: Vec<std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + 'p>>>,
+ _progress: Option<&mut ProgressBar>,
) -> Result<()> {
- let uncaught: Option<anyhow::Error> = None;
-
- // TODO(phase-b): Promise::then captures uncaught by Fn; needs a Cell/RefCell wrapper
- // and a thunk that matches FnOnce(Option<PhpMixed>) -> Option<PhpMixed>.
- let _ = shirabe_external_packages::react::promise::all(
- promises
- .iter()
- .map(|_| todo!("clone Box<dyn PromiseInterface>"))
- .collect(),
- );
-
- // keep track of every group of promises that is waited on, so abortJobs can
- // cancel them all, even if wait() was called within a wait()
- let wait_index = self.wait_index;
- self.wait_index += 1;
- self.current_promises.insert(wait_index, promises);
-
- if let Some(ref mut progress) = progress {
- let mut total_jobs: i64 = 0;
- total_jobs += self.http_downloader.borrow_mut().count_active_jobs(None);
- if let Some(ref pe) = self.process_executor {
- total_jobs += pe.borrow_mut().count_active_jobs(None);
- }
- progress.start(Some(total_jobs));
- }
-
- let mut last_update: f64 = 0.0;
- loop {
- let mut active_jobs: i64 = 0;
+ let mut uncaught: Option<anyhow::Error> = None;
- active_jobs += self.http_downloader.borrow_mut().count_active_jobs(None);
- if let Some(ref pe) = self.process_executor {
- active_jobs += pe.borrow_mut().count_active_jobs(None);
- }
-
- if let Some(ref mut progress) = progress {
- if microtime(true) - last_update > 0.1 {
- last_update = microtime(true);
- let new_progress = progress.get_max_steps() - active_jobs;
- progress.set_progress(new_progress);
+ // TODO(phase-c-promise): the asynchronous worker classes (HttpDownloader / ProcessExecutor)
+ // run single-threaded for now, so the promises are consumed serially. Once the workers run
+ // on a multi-thread runtime these futures should be driven concurrently instead of in order.
+ // The PHP progress bar is tied to the worker active-job count and is also deferred until then.
+ for promise in promises {
+ if let Err(e) = promise.await {
+ if uncaught.is_none() {
+ uncaught = Some(e);
}
}
-
- if active_jobs == 0 {
- break;
- }
}
- // as we skip progress updates if they are too quick, make sure we do one last one here at 100%
- if let Some(ref mut progress) = progress {
- progress.finish();
- }
-
- self.current_promises.remove(&wait_index);
if let Some(e) = uncaught {
return Err(e);
}
@@ -121,11 +66,8 @@ impl Loop {
}
pub fn abort_jobs(&self) {
- for promise_group in self.current_promises.values() {
- for _promise in promise_group {
- // TODO(phase-b): cancel requires CancellablePromiseInterface; PromiseInterface trait
- // doesn't expose it. Drop the wrap+cancel until we have the right trait.
- }
- }
+ // TODO(phase-c-promise): no-op until a cancellation mechanism is introduced. PHP cancels
+ // every in-flight promise group it tracks in $currentPromises; reintroduce that tracking
+ // once the asynchronous workers support cancellation on a multi-thread runtime.
}
}
diff --git a/crates/shirabe/src/util/process_executor.rs b/crates/shirabe/src/util/process_executor.rs
index 1ee821c..74fc30c 100644
--- a/crates/shirabe/src/util/process_executor.rs
+++ b/crates/shirabe/src/util/process_executor.rs
@@ -53,8 +53,7 @@ struct Job {
command: PhpMixed,
cwd: Option<String>,
process: Option<Process>,
- resolve: Option<Box<dyn Fn(PhpMixed) + Send + Sync>>,
- reject: Option<Box<dyn Fn(PhpMixed) + Send + Sync>>,
+ exception: Option<anyhow::Error>,
}
impl std::fmt::Debug for Job {
@@ -396,32 +395,30 @@ impl ProcessExecutor {
command,
cwd: cwd_opt,
process: None,
- resolve: None,
- reject: None,
+ exception: None,
};
- // TODO(phase-b): build resolver/canceler closures bound to &mut self.jobs
- let resolver: Box<dyn Fn(Option<PhpMixed>, Option<PhpMixed>)> =
- Box::new(|_resolve, _reject| {});
- let canceler: Box<dyn Fn()> = Box::new(|| {
- if defined("SIGINT") {
- // job.process.signal(SIGINT)
- }
- // job.process.stop(1)
- });
- let _ = (resolver, canceler);
-
- let promise = Promise::new(Box::new(|_resolve, _reject| {}));
- // TODO(phase-b): wire promise.then() side-effects: mark job done & update status
- let promise: Box<dyn PromiseInterface> = Box::new(promise);
-
self.jobs.insert(id, job);
if self.running_jobs < self.max_jobs {
self.start_job(id);
}
- Ok(promise)
+ // Drive the job to completion (serial pump). PHP resolves the promise with the Process
+ // once it stops running; here we await by pumping count_active_jobs and then hand back the
+ // Process (or the rejection captured during start_job).
+ self.wait_id(Some(id))?;
+
+ let mut job = self.jobs.shift_remove(&id).unwrap();
+ if let Some(process) = job.process.take() {
+ Ok(process)
+ } else if let Some(e) = job.exception.take() {
+ Err(e)
+ } else {
+ Err(anyhow::anyhow!(
+ "ProcessExecutor async job completed without a process"
+ ))
+ }
}
fn output_handler(&mut self, r#type: &str, buffer: &str) {
@@ -496,8 +493,13 @@ impl ProcessExecutor {
})();
let process = match process_result {
Ok(p) => p,
- Err(_e) => {
- // job.reject(e) — TODO(phase-b)
+ Err(e) => {
+ // PHP: $job['reject']($e) — record the rejection and settle the job as failed.
+ if let Some(job) = self.jobs.get_mut(&id) {
+ job.status = Self::STATUS_FAILED;
+ job.exception = Some(e);
+ }
+ self.mark_job_done();
return;
}
};
@@ -544,7 +546,7 @@ impl ProcessExecutor {
pub fn wait_id(&mut self, index: Option<i64>) -> Result<()> {
loop {
- if 0 == self.count_active_jobs(index) {
+ if 0 == self.count_active_jobs(index)? {
return Ok(());
}
@@ -558,7 +560,7 @@ impl ProcessExecutor {
}
/// @internal
- pub fn count_active_jobs(&mut self, index: Option<i64>) -> i64 {
+ pub fn count_active_jobs(&mut self, index: Option<i64>) -> Result<i64> {
// tick
let ids: Vec<i64> = self.jobs.keys().copied().collect();
for id in &ids {
@@ -575,18 +577,26 @@ impl ProcessExecutor {
.map(|p| p.is_running())
.unwrap_or(false);
if !is_running {
- if let Some(job) = self.jobs.get(id) {
- if let Some(resolve) = job.resolve.as_ref() {
- let process_mixed = PhpMixed::Null; // TODO(phase-b): wrap Process as PhpMixed
- resolve(process_mixed);
- }
+ // PHP: call_user_func($job['resolve'], $job['process']) — the .then handler
+ // marks the job completed/failed based on the process exit status.
+ let successful = self
+ .jobs
+ .get(id)
+ .and_then(|j| j.process.as_ref())
+ .map(|p| p.is_successful())
+ .unwrap_or(false);
+ if let Some(job) = self.jobs.get_mut(id) {
+ job.status = if successful {
+ Self::STATUS_COMPLETED
+ } else {
+ Self::STATUS_FAILED
+ };
}
+ self.mark_job_done();
}
- if let Some(job) = self.jobs.get_mut(id) {
- if let Some(p) = job.process.as_mut() {
- p.check_timeout();
- }
+ if let Some(p) = self.jobs.get(id).and_then(|j| j.process.as_ref()) {
+ p.check_timeout()?;
}
}
}
@@ -600,12 +610,13 @@ impl ProcessExecutor {
}
if let Some(index) = index {
- return if self.jobs.get(&index).map(|j| j.status).unwrap_or(0) < Self::STATUS_COMPLETED
- {
- 1
- } else {
- 0
- };
+ return Ok(
+ if self.jobs.get(&index).map(|j| j.status).unwrap_or(0) < Self::STATUS_COMPLETED {
+ 1
+ } else {
+ 0
+ },
+ );
}
let mut active: i64 = 0;
@@ -619,7 +630,7 @@ impl ProcessExecutor {
}
}
- active
+ Ok(active)
}
fn mark_job_done(&mut self) {
diff --git a/crates/shirabe/src/util/sync_helper.rs b/crates/shirabe/src/util/sync_helper.rs
index 4c48c5c..7388b96 100644
--- a/crates/shirabe/src/util/sync_helper.rs
+++ b/crates/shirabe/src/util/sync_helper.rs
@@ -86,7 +86,6 @@ impl<'a> DownloaderOrManager<'a> {
pub struct SyncHelper;
impl SyncHelper {
- // TODO(phase-c-promise): synchronous wrapper driving now-async downloader calls via Self::await (loop.wait); needs async/loop boundary design.
pub fn download_and_install_package_sync(
r#loop: &std::rc::Rc<std::cell::RefCell<Loop>>,
downloader: DownloaderOrManager<'_>,
@@ -100,21 +99,41 @@ impl SyncHelper {
"install"
};
- let result: Result<()> = (|| {
+ let result: Result<()> = (|| -> Result<()> {
Self::r#await(
r#loop,
- Some(downloader.download(package, &path, prev_package)?),
+ Some(Box::pin(async {
+ downloader
+ .download(package, &path, prev_package)
+ .await
+ .map(|_| ())
+ })),
)?;
Self::r#await(
r#loop,
- Some(downloader.prepare(r#type, package, &path, prev_package)?),
+ Some(Box::pin(async {
+ downloader
+ .prepare(r#type, package, &path, prev_package)
+ .await
+ .map(|_| ())
+ })),
)?;
if r#type == "update" {
if let Some(prev) = prev_package {
- Self::r#await(r#loop, Some(downloader.update(package, prev, &path)?))?;
+ Self::r#await(
+ r#loop,
+ Some(Box::pin(async {
+ downloader.update(package, prev, &path).await.map(|_| ())
+ })),
+ )?;
}
} else {
- Self::r#await(r#loop, Some(downloader.install(package, &path)?))?;
+ Self::r#await(
+ r#loop,
+ Some(Box::pin(async {
+ downloader.install(package, &path).await.map(|_| ())
+ })),
+ )?;
}
Ok(())
})();
@@ -122,25 +141,36 @@ impl SyncHelper {
if result.is_err() {
Self::r#await(
r#loop,
- Some(downloader.cleanup(r#type, package, &path, prev_package)?),
+ Some(Box::pin(async {
+ downloader
+ .cleanup(r#type, package, &path, prev_package)
+ .await
+ .map(|_| ())
+ })),
)?;
return result;
}
Self::r#await(
r#loop,
- Some(downloader.cleanup(r#type, package, &path, prev_package)?),
+ Some(Box::pin(async {
+ downloader
+ .cleanup(r#type, package, &path, prev_package)
+ .await
+ .map(|_| ())
+ })),
)?;
Ok(())
}
- // TODO(phase-c-promise): loop-pump synchronous wait over a promise; driving mechanism needs design.
pub fn r#await(
r#loop: &std::rc::Rc<std::cell::RefCell<Loop>>,
- promise: Option<Box<dyn PromiseInterface>>,
+ promise: Option<std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + '_>>>,
) -> Result<()> {
if let Some(promise) = promise {
- r#loop.borrow_mut().wait(vec![promise], None)?;
+ tokio::runtime::Runtime::new()
+ .unwrap()
+ .block_on(r#loop.borrow_mut().wait(vec![promise], None))?;
}
Ok(())
}