diff options
Diffstat (limited to 'crates/shirabe/src/util')
| -rw-r--r-- | crates/shirabe/src/util/filesystem.rs | 51 | ||||
| -rw-r--r-- | crates/shirabe/src/util/http/curl_downloader.rs | 1 | ||||
| -rw-r--r-- | crates/shirabe/src/util/http_downloader.rs | 267 | ||||
| -rw-r--r-- | crates/shirabe/src/util/loop.rs | 90 | ||||
| -rw-r--r-- | crates/shirabe/src/util/process_executor.rs | 91 | ||||
| -rw-r--r-- | crates/shirabe/src/util/sync_helper.rs | 52 |
6 files changed, 322 insertions, 230 deletions
diff --git a/crates/shirabe/src/util/filesystem.rs b/crates/shirabe/src/util/filesystem.rs index 1bf4673..73f21d6 100644 --- a/crates/shirabe/src/util/filesystem.rs +++ b/crates/shirabe/src/util/filesystem.rs @@ -149,43 +149,26 @@ impl Filesystem { vec!["rm".to_string(), "-rf".to_string(), directory.to_string()] }; - // TODO(phase-c-promise): execute_async is now async fn -> Result<Process>; the .then_boxed continuation that - // inspects the process result and flattens into a recursive removeDirectoryPhp fallback needs job-machine - // boundary design before it can become a flat await chain. - let promise = self.get_process().execute_async( - PhpMixed::List( - cmd.iter() - .map(|s| Box::new(PhpMixed::String(s.clone()))) - .collect(), - ), - (), - )?; + let process = self + .get_process() + .execute_async( + PhpMixed::List( + cmd.iter() + .map(|s| Box::new(PhpMixed::String(s.clone()))) + .collect(), + ), + (), + ) + .await?; - let directory_owned = directory.to_string(); - // TODO(plugin): closure capture of $this in PHP — port wires the same logic via a callback handle. - Ok(promise.then_boxed( - Some(Box::new( - move |process: PhpMixed| -> Box<dyn PromiseInterface> { - // clear stat cache because external processes aren't tracked by the php stat cache - clearstatcache2(false, ""); + // clear stat cache because external processes aren't tracked by the php stat cache + clearstatcache2(false, ""); - // TODO(phase-b): ArrayObject has no call_method; PHP-side calls $process->isSuccessful(). - let is_successful = matches!(process, PhpMixed::Bool(true)); - if is_successful && !is_dir(&directory_owned) { - return shirabe_external_packages::react::promise::resolve(Some( - PhpMixed::Bool(true), - )); - } + if process.is_successful() && !is_dir(directory) { + return Ok(true); + } - // PHP: \React\Promise\resolve($this->removeDirectoryPhp($directory)) - // The recursive PHP call doesn't have a clean async equivalent; we resort to a sync call. - let mut fs = Filesystem::new(None); - let res = fs.remove_directory_php(&directory_owned).unwrap_or(false); - shirabe_external_packages::react::promise::resolve(Some(PhpMixed::Bool(res))) - }, - )), - None, - )) + self.remove_directory_php(directory) } /// Returns null when no edge case was hit. Otherwise a bool whether removal was successful diff --git a/crates/shirabe/src/util/http/curl_downloader.rs b/crates/shirabe/src/util/http/curl_downloader.rs index bd6817c..c3a1227 100644 --- a/crates/shirabe/src/util/http/curl_downloader.rs +++ b/crates/shirabe/src/util/http/curl_downloader.rs @@ -36,7 +36,6 @@ use crate::util::Url; use crate::util::http::CurlResponse; use crate::util::http::ProxyManager; use crate::util::{AuthHelper, PromptAuthResult, StoreAuth}; -// use shirabe_external_packages::react::promise::Promise; // typehint only in PHP /// @phpstan-type Attributes array{retryAuthFailure: bool, redirects: int<0, max>, retries: int<0, max>, storeAuth: 'prompt'|bool, ipResolve: 4|6|null} /// @phpstan-type Job array{url: non-empty-string, origin: string, attributes: Attributes, options: mixed[], progress: mixed[], curlHandle: \CurlHandle, filename: string|null, headerHandle: resource, bodyHandle: resource, resolve: callable, reject: callable, primaryIp: string} diff --git a/crates/shirabe/src/util/http_downloader.rs b/crates/shirabe/src/util/http_downloader.rs index 6f769ce..b1cf921 100644 --- a/crates/shirabe/src/util/http_downloader.rs +++ b/crates/shirabe/src/util/http_downloader.rs @@ -16,9 +16,9 @@ use crate::composer; use crate::composer::ComposerHandle; use crate::config::Config; use crate::downloader::TransportException; -use crate::exception::IrrecoverableDownloadException; use crate::io::IOInterface; use crate::package::version::VersionParser; +use crate::util::GetResult; use crate::util::Platform; use crate::util::RemoteFilesystem; use crate::util::StreamContextFactory; @@ -60,11 +60,13 @@ struct Job { request: Request, sync: bool, origin: String, - resolve: Option<Box<dyn Fn(PhpMixed) + Send + Sync>>, - reject: Option<Box<dyn Fn(PhpMixed) + Send + Sync>>, curl_id: Option<i64>, response: Option<Response>, exception: Option<anyhow::Error>, + /// Completion slot written by the curl resolve/reject closures (driven by `curl.tick()`) + /// and read by `count_active_jobs`. Uses `Arc<Mutex>` because `CurlDownloader::download` + /// requires `Send + Sync` callbacks. + settled: std::sync::Arc<std::sync::Mutex<Option<anyhow::Result<Response>>>>, } impl std::fmt::Debug for Job { @@ -178,7 +180,7 @@ impl HttpDownloader { } .into()); } - let (job, promise) = self.add_job( + let job = self.add_job( Request { url: url.to_string(), options, @@ -186,13 +188,6 @@ impl HttpDownloader { }, true, )?; - promise.then_with( - None, - Some(Box::new(|_e: PhpMixed| { - // suppress error as it is rethrown to the caller by getResponse() a few lines below - PhpMixed::Null - })), - ); self.wait_id(Some(job.id))?; let response = self.get_response(job.id)?; @@ -213,7 +208,7 @@ impl HttpDownloader { } .into()); } - let (_, promise) = self.add_job( + let job = self.add_job( Request { url: url.to_string(), options, @@ -221,8 +216,9 @@ impl HttpDownloader { }, false, )?; + self.wait_id(Some(job.id))?; - Ok(promise) + self.get_response(job.id) } /// Copy a file synchronously @@ -239,7 +235,7 @@ impl HttpDownloader { } .into()); } - let (job, _) = self.add_job( + let job = self.add_job( Request { url: url.to_string(), options, @@ -266,7 +262,7 @@ impl HttpDownloader { } .into()); } - let (_, promise) = self.add_job( + let job = self.add_job( Request { url: url.to_string(), options, @@ -274,8 +270,9 @@ impl HttpDownloader { }, false, )?; + self.wait_id(Some(job.id))?; - Ok(promise) + self.get_response(job.id) } /// Retrieve the options set in the constructor @@ -289,27 +286,17 @@ impl HttpDownloader { } /// @phpstan-param Request $request - /// @return array{Job, PromiseInterface} - async fn add_job(&mut self, mut request: Request, sync: bool) -> Result<(JobHandle, Response)> { + /// + /// Queues a job and starts it if there is capacity. Mirrors PHP `addJob`: for non-curl (rfs) + /// jobs the work runs synchronously here (PHP runs it in the Promise resolver during + /// construction); for curl jobs the work is driven later by `start_job` / `count_active_jobs`. + fn add_job(&mut self, mut request: Request, sync: bool) -> Result<JobHandle> { request.options = array_replace_recursive(self.options.clone(), request.options); let id = self.id_gen; self.id_gen += 1; let origin = Url::get_origin(&*self.config.borrow(), &request.url); - let job = Job { - id, - status: Self::STATUS_QUEUED, - request: request.clone(), - sync, - origin: origin.clone(), - resolve: None, - reject: None, - curl_id: None, - response: None, - exception: None, - }; - if !sync && !self.allow_async { return Err(LogicException { message: @@ -346,34 +333,124 @@ impl HttpDownloader { ); } - // TODO(phase-b): build resolver/canceler closures bound to &mut self.jobs; needs Rc<RefCell> wiring - let _ = (&self.rfs, &self.curl); - - let resolver: Box<dyn Fn(Box<dyn Fn(PhpMixed)>, Box<dyn Fn(PhpMixed)>)> = - Box::new(|_resolve, _reject| { - // TODO(phase-b) - }); - let canceler: Box<dyn Fn()> = Box::new(|| { - // PHP canceler logic — TODO(phase-b) - let _ = IrrecoverableDownloadException(shirabe_php_shim::RuntimeException { - message: "Download canceled".to_string(), - code: 0, - }); - let _ = Url::sanitize(String::new()); - }); - let _ = (resolver, canceler); - - let promise = Promise::new(Box::new(|_resolve, _reject| {})); - // TODO(phase-b): wire promise.then() side-effects: mark job done & store response/exception - let promise: Box<dyn PromiseInterface> = Box::new(promise); - + let job = Job { + id, + status: Self::STATUS_QUEUED, + request: request.clone(), + sync, + origin, + curl_id: None, + response: None, + exception: None, + settled: std::sync::Arc::new(std::sync::Mutex::new(None)), + }; + let can_use_curl = self.can_use_curl(&job); self.jobs.insert(id, job); + // PHP runs the resolver synchronously while constructing the Promise. For non-curl jobs + // the resolver performs the blocking RemoteFilesystem download and resolves immediately. + if !can_use_curl { + self.run_rfs_job(id); + } + if self.running_jobs < self.max_jobs { self.start_job(id); } - Ok((JobHandle { id }, promise)) + Ok(JobHandle { id }) + } + + /// Mirrors the non-curl branch of PHP `addJob`'s Promise resolver plus the `.then` side + /// effects: performs the blocking RemoteFilesystem download and settles the job. + fn run_rfs_job(&mut self, id: i64) { + let (request, origin, url, options, copy_to) = { + let job = self.jobs.get(&id).unwrap(); + ( + job.request.clone(), + job.origin.clone(), + job.request.url.clone(), + job.request.options.clone(), + job.request.copy_to.clone(), + ) + }; + + if let Some(job) = self.jobs.get_mut(&id) { + job.status = Self::STATUS_STARTED; + } + + let result: anyhow::Result<Response> = { + let rfs = self.rfs.as_mut().unwrap(); + (|| -> anyhow::Result<Response> { + if let Some(copy_to) = copy_to.as_deref() { + rfs.copy(&origin, &url, copy_to, false, options.clone())?; + + let headers = rfs.get_last_headers().to_vec(); + let code = RemoteFilesystem::find_status_code(&headers); + let body = Some(format!("{}~", copy_to)); + match Response::new(Self::request_to_map(&request), code, headers, body)? { + Ok(r) => Ok(r), + Err(e) => Err(e.into()), + } + } else { + let body = match rfs.get_contents(&origin, &url, false, options.clone())? { + GetResult::Content(s) => Some(s), + _ => None, + }; + let headers = rfs.get_last_headers().to_vec(); + let code = RemoteFilesystem::find_status_code(&headers); + match Response::new(Self::request_to_map(&request), code, headers, body)? { + Ok(r) => Ok(r), + Err(e) => Err(e.into()), + } + } + })() + }; + + self.settle_job(id, result); + } + + /// PHP `new Response($job['request'], ...)` is fed the whole request array; reproduce it. + fn request_to_map(request: &Request) -> IndexMap<String, PhpMixed> { + let mut m: IndexMap<String, PhpMixed> = IndexMap::new(); + m.insert("url".to_string(), PhpMixed::String(request.url.clone())); + m.insert( + "options".to_string(), + PhpMixed::Array( + request + .options + .iter() + .map(|(k, v)| (k.clone(), Box::new(v.clone()))) + .collect(), + ), + ); + m.insert( + "copyTo".to_string(), + match &request.copy_to { + Some(s) => PhpMixed::String(s.clone()), + None => PhpMixed::Null, + }, + ); + m + } + + /// Applies the effect of PHP's promise `.then` handlers: records the response/exception, + /// transitions the job status and decrements the running-job counter. + fn settle_job(&mut self, id: i64, result: anyhow::Result<Response>) { + match result { + Ok(response) => { + if let Some(job) = self.jobs.get_mut(&id) { + job.status = Self::STATUS_COMPLETED; + job.response = Some(response); + } + } + Err(e) => { + if let Some(job) = self.jobs.get_mut(&id) { + job.status = Self::STATUS_FAILED; + job.exception = Some(e); + } + } + } + self.mark_job_done(); } fn start_job(&mut self, id: i64) { @@ -398,7 +475,6 @@ impl HttpDownloader { }; let url = request.url.clone(); let options = request.options.clone(); - let _ = origin; if self.disabled { let has_if_modified_since = { @@ -433,8 +509,13 @@ impl HttpDownloader { if has_if_modified_since { let mut req_map: IndexMap<String, PhpMixed> = IndexMap::new(); req_map.insert("url".to_string(), PhpMixed::String(url.clone())); - let _ = Response::new(req_map, Some(304), Vec::new(), Some(String::new())); - // job.resolve(response) — TODO(phase-b) + let response = + match Response::new(req_map, Some(304), Vec::new(), Some(String::new())) { + Ok(Ok(r)) => Ok(r), + Ok(Err(e)) => Err(e.into()), + Err(e) => Err(e), + }; + self.settle_job(id, response); } else { let mut e = TransportException::new( format!( @@ -444,15 +525,45 @@ impl HttpDownloader { 499, ); e.set_status_code(Some(499)); - // job.reject(e) — TODO(phase-b) - let _ = e; + self.settle_job(id, Err(e.into())); } return; } - let _ = copy_to; - // TODO(phase-b): try { curl->download(...) } catch (...) { reject(e) } + // curl branch: register the request with the curl multi handle. Completion is delivered + // asynchronously by curl.tick() into the job's `settled` slot (read by count_active_jobs). + // PHP catches any exception from download() and rejects the job. + let settled = self.jobs.get(&id).unwrap().settled.clone(); + let settled_for_reject = settled.clone(); + let resolve: Box<dyn Fn(PhpMixed) + Send + Sync> = Box::new(move |_response: PhpMixed| { + // TODO(phase-c-promise): curl.tick() delivers the response as PhpMixed here; convert it + // into a Response and store Ok(..) in `settled`. Bottoms at the todo!() curl I/O. + let _ = &settled; + }); + let reject: Box<dyn Fn(PhpMixed) + Send + Sync> = Box::new(move |_error: PhpMixed| { + // TODO(phase-c-promise): convert the PhpMixed error into anyhow::Error and store + // Err(..) in `settled`. Bottoms at the todo!() curl I/O. + let _ = &settled_for_reject; + }); + + let download_result = { + let curl = self.curl.as_mut().unwrap(); + curl.download(resolve, reject, &origin, &url, options, copy_to.as_deref()) + }; + match download_result { + Ok(Ok(curl_id)) => { + if let Some(job) = self.jobs.get_mut(&id) { + job.curl_id = Some(curl_id); + } + } + Ok(Err(e)) => { + self.settle_job(id, Err(e.into())); + } + Err(e) => { + self.settle_job(id, Err(e)); + } + } } fn mark_job_done(&mut self) { @@ -468,7 +579,7 @@ impl HttpDownloader { fn wait_id(&mut self, index: Option<i64>) -> Result<()> { loop { - let job_count = self.count_active_jobs(index); + let job_count = self.count_active_jobs(index)?; if job_count == 0 { break; } @@ -482,7 +593,7 @@ impl HttpDownloader { } /// @internal - pub fn count_active_jobs(&mut self, index: Option<i64>) -> i64 { + pub fn count_active_jobs(&mut self, index: Option<i64>) -> Result<i64> { if self.running_jobs < self.max_jobs { let queued_ids: Vec<i64> = self .jobs @@ -499,16 +610,32 @@ impl HttpDownloader { } if let Some(curl) = self.curl.as_mut() { - curl.tick(); + curl.tick()?; + } + + // Apply completions delivered by curl.tick() into each started job's `settled` slot. + // This reproduces the effect of PHP's resolve/reject callbacks firing during tick(). + let started_ids: Vec<i64> = self + .jobs + .values() + .filter(|j| j.status == Self::STATUS_STARTED) + .map(|j| j.id) + .collect(); + for id in started_ids { + let settled = self.jobs.get(&id).unwrap().settled.lock().unwrap().take(); + if let Some(result) = settled { + self.settle_job(id, result); + } } if let Some(index) = index { - return if self.jobs.get(&index).map(|j| j.status).unwrap_or(0) < Self::STATUS_COMPLETED - { - 1 - } else { - 0 - }; + return Ok( + if self.jobs.get(&index).map(|j| j.status).unwrap_or(0) < Self::STATUS_COMPLETED { + 1 + } else { + 0 + }, + ); } let mut active: i64 = 0; @@ -525,7 +652,7 @@ impl HttpDownloader { } } - active + Ok(active) } /// @param int $index Job id diff --git a/crates/shirabe/src/util/loop.rs b/crates/shirabe/src/util/loop.rs index 18f2359..5512211 100644 --- a/crates/shirabe/src/util/loop.rs +++ b/crates/shirabe/src/util/loop.rs @@ -3,25 +3,12 @@ use crate::util::HttpDownloader; use crate::util::ProcessExecutor; use anyhow::Result; -use indexmap::IndexMap; use shirabe_external_packages::symfony::component::console::helper::ProgressBar; -use shirabe_php_shim::microtime; +#[derive(Debug)] pub struct Loop { http_downloader: std::rc::Rc<std::cell::RefCell<HttpDownloader>>, process_executor: Option<std::rc::Rc<std::cell::RefCell<ProcessExecutor>>>, - current_promises: IndexMap<i64, Vec<Box<dyn PromiseInterface>>>, - wait_index: i64, -} - -impl std::fmt::Debug for Loop { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Loop") - .field("http_downloader", &self.http_downloader) - .field("process_executor", &self.process_executor) - .field("wait_index", &self.wait_index) - .finish() - } } impl Loop { @@ -39,8 +26,6 @@ impl Loop { Self { http_downloader, process_executor, - current_promises: IndexMap::new(), - wait_index: 0, } } @@ -54,65 +39,25 @@ impl Loop { self.process_executor.as_ref() } - pub fn wait( + pub async fn wait<'p>( &mut self, - promises: Vec<Box<dyn PromiseInterface>>, - mut progress: Option<&mut ProgressBar>, + promises: Vec<std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + 'p>>>, + _progress: Option<&mut ProgressBar>, ) -> Result<()> { - let uncaught: Option<anyhow::Error> = None; - - // TODO(phase-b): Promise::then captures uncaught by Fn; needs a Cell/RefCell wrapper - // and a thunk that matches FnOnce(Option<PhpMixed>) -> Option<PhpMixed>. - let _ = shirabe_external_packages::react::promise::all( - promises - .iter() - .map(|_| todo!("clone Box<dyn PromiseInterface>")) - .collect(), - ); - - // keep track of every group of promises that is waited on, so abortJobs can - // cancel them all, even if wait() was called within a wait() - let wait_index = self.wait_index; - self.wait_index += 1; - self.current_promises.insert(wait_index, promises); - - if let Some(ref mut progress) = progress { - let mut total_jobs: i64 = 0; - total_jobs += self.http_downloader.borrow_mut().count_active_jobs(None); - if let Some(ref pe) = self.process_executor { - total_jobs += pe.borrow_mut().count_active_jobs(None); - } - progress.start(Some(total_jobs)); - } - - let mut last_update: f64 = 0.0; - loop { - let mut active_jobs: i64 = 0; + let mut uncaught: Option<anyhow::Error> = None; - active_jobs += self.http_downloader.borrow_mut().count_active_jobs(None); - if let Some(ref pe) = self.process_executor { - active_jobs += pe.borrow_mut().count_active_jobs(None); - } - - if let Some(ref mut progress) = progress { - if microtime(true) - last_update > 0.1 { - last_update = microtime(true); - let new_progress = progress.get_max_steps() - active_jobs; - progress.set_progress(new_progress); + // TODO(phase-c-promise): the asynchronous worker classes (HttpDownloader / ProcessExecutor) + // run single-threaded for now, so the promises are consumed serially. Once the workers run + // on a multi-thread runtime these futures should be driven concurrently instead of in order. + // The PHP progress bar is tied to the worker active-job count and is also deferred until then. + for promise in promises { + if let Err(e) = promise.await { + if uncaught.is_none() { + uncaught = Some(e); } } - - if active_jobs == 0 { - break; - } } - // as we skip progress updates if they are too quick, make sure we do one last one here at 100% - if let Some(ref mut progress) = progress { - progress.finish(); - } - - self.current_promises.remove(&wait_index); if let Some(e) = uncaught { return Err(e); } @@ -121,11 +66,8 @@ impl Loop { } pub fn abort_jobs(&self) { - for promise_group in self.current_promises.values() { - for _promise in promise_group { - // TODO(phase-b): cancel requires CancellablePromiseInterface; PromiseInterface trait - // doesn't expose it. Drop the wrap+cancel until we have the right trait. - } - } + // TODO(phase-c-promise): no-op until a cancellation mechanism is introduced. PHP cancels + // every in-flight promise group it tracks in $currentPromises; reintroduce that tracking + // once the asynchronous workers support cancellation on a multi-thread runtime. } } diff --git a/crates/shirabe/src/util/process_executor.rs b/crates/shirabe/src/util/process_executor.rs index 1ee821c..74fc30c 100644 --- a/crates/shirabe/src/util/process_executor.rs +++ b/crates/shirabe/src/util/process_executor.rs @@ -53,8 +53,7 @@ struct Job { command: PhpMixed, cwd: Option<String>, process: Option<Process>, - resolve: Option<Box<dyn Fn(PhpMixed) + Send + Sync>>, - reject: Option<Box<dyn Fn(PhpMixed) + Send + Sync>>, + exception: Option<anyhow::Error>, } impl std::fmt::Debug for Job { @@ -396,32 +395,30 @@ impl ProcessExecutor { command, cwd: cwd_opt, process: None, - resolve: None, - reject: None, + exception: None, }; - // TODO(phase-b): build resolver/canceler closures bound to &mut self.jobs - let resolver: Box<dyn Fn(Option<PhpMixed>, Option<PhpMixed>)> = - Box::new(|_resolve, _reject| {}); - let canceler: Box<dyn Fn()> = Box::new(|| { - if defined("SIGINT") { - // job.process.signal(SIGINT) - } - // job.process.stop(1) - }); - let _ = (resolver, canceler); - - let promise = Promise::new(Box::new(|_resolve, _reject| {})); - // TODO(phase-b): wire promise.then() side-effects: mark job done & update status - let promise: Box<dyn PromiseInterface> = Box::new(promise); - self.jobs.insert(id, job); if self.running_jobs < self.max_jobs { self.start_job(id); } - Ok(promise) + // Drive the job to completion (serial pump). PHP resolves the promise with the Process + // once it stops running; here we await by pumping count_active_jobs and then hand back the + // Process (or the rejection captured during start_job). + self.wait_id(Some(id))?; + + let mut job = self.jobs.shift_remove(&id).unwrap(); + if let Some(process) = job.process.take() { + Ok(process) + } else if let Some(e) = job.exception.take() { + Err(e) + } else { + Err(anyhow::anyhow!( + "ProcessExecutor async job completed without a process" + )) + } } fn output_handler(&mut self, r#type: &str, buffer: &str) { @@ -496,8 +493,13 @@ impl ProcessExecutor { })(); let process = match process_result { Ok(p) => p, - Err(_e) => { - // job.reject(e) — TODO(phase-b) + Err(e) => { + // PHP: $job['reject']($e) — record the rejection and settle the job as failed. + if let Some(job) = self.jobs.get_mut(&id) { + job.status = Self::STATUS_FAILED; + job.exception = Some(e); + } + self.mark_job_done(); return; } }; @@ -544,7 +546,7 @@ impl ProcessExecutor { pub fn wait_id(&mut self, index: Option<i64>) -> Result<()> { loop { - if 0 == self.count_active_jobs(index) { + if 0 == self.count_active_jobs(index)? { return Ok(()); } @@ -558,7 +560,7 @@ impl ProcessExecutor { } /// @internal - pub fn count_active_jobs(&mut self, index: Option<i64>) -> i64 { + pub fn count_active_jobs(&mut self, index: Option<i64>) -> Result<i64> { // tick let ids: Vec<i64> = self.jobs.keys().copied().collect(); for id in &ids { @@ -575,18 +577,26 @@ impl ProcessExecutor { .map(|p| p.is_running()) .unwrap_or(false); if !is_running { - if let Some(job) = self.jobs.get(id) { - if let Some(resolve) = job.resolve.as_ref() { - let process_mixed = PhpMixed::Null; // TODO(phase-b): wrap Process as PhpMixed - resolve(process_mixed); - } + // PHP: call_user_func($job['resolve'], $job['process']) — the .then handler + // marks the job completed/failed based on the process exit status. + let successful = self + .jobs + .get(id) + .and_then(|j| j.process.as_ref()) + .map(|p| p.is_successful()) + .unwrap_or(false); + if let Some(job) = self.jobs.get_mut(id) { + job.status = if successful { + Self::STATUS_COMPLETED + } else { + Self::STATUS_FAILED + }; } + self.mark_job_done(); } - if let Some(job) = self.jobs.get_mut(id) { - if let Some(p) = job.process.as_mut() { - p.check_timeout(); - } + if let Some(p) = self.jobs.get(id).and_then(|j| j.process.as_ref()) { + p.check_timeout()?; } } } @@ -600,12 +610,13 @@ impl ProcessExecutor { } if let Some(index) = index { - return if self.jobs.get(&index).map(|j| j.status).unwrap_or(0) < Self::STATUS_COMPLETED - { - 1 - } else { - 0 - }; + return Ok( + if self.jobs.get(&index).map(|j| j.status).unwrap_or(0) < Self::STATUS_COMPLETED { + 1 + } else { + 0 + }, + ); } let mut active: i64 = 0; @@ -619,7 +630,7 @@ impl ProcessExecutor { } } - active + Ok(active) } fn mark_job_done(&mut self) { diff --git a/crates/shirabe/src/util/sync_helper.rs b/crates/shirabe/src/util/sync_helper.rs index 4c48c5c..7388b96 100644 --- a/crates/shirabe/src/util/sync_helper.rs +++ b/crates/shirabe/src/util/sync_helper.rs @@ -86,7 +86,6 @@ impl<'a> DownloaderOrManager<'a> { pub struct SyncHelper; impl SyncHelper { - // TODO(phase-c-promise): synchronous wrapper driving now-async downloader calls via Self::await (loop.wait); needs async/loop boundary design. pub fn download_and_install_package_sync( r#loop: &std::rc::Rc<std::cell::RefCell<Loop>>, downloader: DownloaderOrManager<'_>, @@ -100,21 +99,41 @@ impl SyncHelper { "install" }; - let result: Result<()> = (|| { + let result: Result<()> = (|| -> Result<()> { Self::r#await( r#loop, - Some(downloader.download(package, &path, prev_package)?), + Some(Box::pin(async { + downloader + .download(package, &path, prev_package) + .await + .map(|_| ()) + })), )?; Self::r#await( r#loop, - Some(downloader.prepare(r#type, package, &path, prev_package)?), + Some(Box::pin(async { + downloader + .prepare(r#type, package, &path, prev_package) + .await + .map(|_| ()) + })), )?; if r#type == "update" { if let Some(prev) = prev_package { - Self::r#await(r#loop, Some(downloader.update(package, prev, &path)?))?; + Self::r#await( + r#loop, + Some(Box::pin(async { + downloader.update(package, prev, &path).await.map(|_| ()) + })), + )?; } } else { - Self::r#await(r#loop, Some(downloader.install(package, &path)?))?; + Self::r#await( + r#loop, + Some(Box::pin(async { + downloader.install(package, &path).await.map(|_| ()) + })), + )?; } Ok(()) })(); @@ -122,25 +141,36 @@ impl SyncHelper { if result.is_err() { Self::r#await( r#loop, - Some(downloader.cleanup(r#type, package, &path, prev_package)?), + Some(Box::pin(async { + downloader + .cleanup(r#type, package, &path, prev_package) + .await + .map(|_| ()) + })), )?; return result; } Self::r#await( r#loop, - Some(downloader.cleanup(r#type, package, &path, prev_package)?), + Some(Box::pin(async { + downloader + .cleanup(r#type, package, &path, prev_package) + .await + .map(|_| ()) + })), )?; Ok(()) } - // TODO(phase-c-promise): loop-pump synchronous wait over a promise; driving mechanism needs design. pub fn r#await( r#loop: &std::rc::Rc<std::cell::RefCell<Loop>>, - promise: Option<Box<dyn PromiseInterface>>, + promise: Option<std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + '_>>>, ) -> Result<()> { if let Some(promise) = promise { - r#loop.borrow_mut().wait(vec![promise], None)?; + tokio::runtime::Runtime::new() + .unwrap() + .block_on(r#loop.borrow_mut().wait(vec![promise], None))?; } Ok(()) } |
