aboutsummaryrefslogtreecommitdiffhomepage
path: root/crates/shirabe/src/util/http_downloader.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/shirabe/src/util/http_downloader.rs')
-rw-r--r--crates/shirabe/src/util/http_downloader.rs267
1 files changed, 197 insertions, 70 deletions
diff --git a/crates/shirabe/src/util/http_downloader.rs b/crates/shirabe/src/util/http_downloader.rs
index 6f769ce..b1cf921 100644
--- a/crates/shirabe/src/util/http_downloader.rs
+++ b/crates/shirabe/src/util/http_downloader.rs
@@ -16,9 +16,9 @@ use crate::composer;
use crate::composer::ComposerHandle;
use crate::config::Config;
use crate::downloader::TransportException;
-use crate::exception::IrrecoverableDownloadException;
use crate::io::IOInterface;
use crate::package::version::VersionParser;
+use crate::util::GetResult;
use crate::util::Platform;
use crate::util::RemoteFilesystem;
use crate::util::StreamContextFactory;
@@ -60,11 +60,13 @@ struct Job {
request: Request,
sync: bool,
origin: String,
- resolve: Option<Box<dyn Fn(PhpMixed) + Send + Sync>>,
- reject: Option<Box<dyn Fn(PhpMixed) + Send + Sync>>,
curl_id: Option<i64>,
response: Option<Response>,
exception: Option<anyhow::Error>,
+ /// Completion slot written by the curl resolve/reject closures (driven by `curl.tick()`)
+ /// and read by `count_active_jobs`. Uses `Arc<Mutex>` because `CurlDownloader::download`
+ /// requires `Send + Sync` callbacks.
+ settled: std::sync::Arc<std::sync::Mutex<Option<anyhow::Result<Response>>>>,
}
impl std::fmt::Debug for Job {
@@ -178,7 +180,7 @@ impl HttpDownloader {
}
.into());
}
- let (job, promise) = self.add_job(
+ let job = self.add_job(
Request {
url: url.to_string(),
options,
@@ -186,13 +188,6 @@ impl HttpDownloader {
},
true,
)?;
- promise.then_with(
- None,
- Some(Box::new(|_e: PhpMixed| {
- // suppress error as it is rethrown to the caller by getResponse() a few lines below
- PhpMixed::Null
- })),
- );
self.wait_id(Some(job.id))?;
let response = self.get_response(job.id)?;
@@ -213,7 +208,7 @@ impl HttpDownloader {
}
.into());
}
- let (_, promise) = self.add_job(
+ let job = self.add_job(
Request {
url: url.to_string(),
options,
@@ -221,8 +216,9 @@ impl HttpDownloader {
},
false,
)?;
+ self.wait_id(Some(job.id))?;
- Ok(promise)
+ self.get_response(job.id)
}
/// Copy a file synchronously
@@ -239,7 +235,7 @@ impl HttpDownloader {
}
.into());
}
- let (job, _) = self.add_job(
+ let job = self.add_job(
Request {
url: url.to_string(),
options,
@@ -266,7 +262,7 @@ impl HttpDownloader {
}
.into());
}
- let (_, promise) = self.add_job(
+ let job = self.add_job(
Request {
url: url.to_string(),
options,
@@ -274,8 +270,9 @@ impl HttpDownloader {
},
false,
)?;
+ self.wait_id(Some(job.id))?;
- Ok(promise)
+ self.get_response(job.id)
}
/// Retrieve the options set in the constructor
@@ -289,27 +286,17 @@ impl HttpDownloader {
}
/// @phpstan-param Request $request
- /// @return array{Job, PromiseInterface}
- async fn add_job(&mut self, mut request: Request, sync: bool) -> Result<(JobHandle, Response)> {
+ ///
+ /// Queues a job and starts it if there is capacity. Mirrors PHP `addJob`: for non-curl (rfs)
+ /// jobs the work runs synchronously here (PHP runs it in the Promise resolver during
+ /// construction); for curl jobs the work is driven later by `start_job` / `count_active_jobs`.
+ fn add_job(&mut self, mut request: Request, sync: bool) -> Result<JobHandle> {
request.options = array_replace_recursive(self.options.clone(), request.options);
let id = self.id_gen;
self.id_gen += 1;
let origin = Url::get_origin(&*self.config.borrow(), &request.url);
- let job = Job {
- id,
- status: Self::STATUS_QUEUED,
- request: request.clone(),
- sync,
- origin: origin.clone(),
- resolve: None,
- reject: None,
- curl_id: None,
- response: None,
- exception: None,
- };
-
if !sync && !self.allow_async {
return Err(LogicException {
message:
@@ -346,34 +333,124 @@ impl HttpDownloader {
);
}
- // TODO(phase-b): build resolver/canceler closures bound to &mut self.jobs; needs Rc<RefCell> wiring
- let _ = (&self.rfs, &self.curl);
-
- let resolver: Box<dyn Fn(Box<dyn Fn(PhpMixed)>, Box<dyn Fn(PhpMixed)>)> =
- Box::new(|_resolve, _reject| {
- // TODO(phase-b)
- });
- let canceler: Box<dyn Fn()> = Box::new(|| {
- // PHP canceler logic — TODO(phase-b)
- let _ = IrrecoverableDownloadException(shirabe_php_shim::RuntimeException {
- message: "Download canceled".to_string(),
- code: 0,
- });
- let _ = Url::sanitize(String::new());
- });
- let _ = (resolver, canceler);
-
- let promise = Promise::new(Box::new(|_resolve, _reject| {}));
- // TODO(phase-b): wire promise.then() side-effects: mark job done & store response/exception
- let promise: Box<dyn PromiseInterface> = Box::new(promise);
-
+ let job = Job {
+ id,
+ status: Self::STATUS_QUEUED,
+ request: request.clone(),
+ sync,
+ origin,
+ curl_id: None,
+ response: None,
+ exception: None,
+ settled: std::sync::Arc::new(std::sync::Mutex::new(None)),
+ };
+ let can_use_curl = self.can_use_curl(&job);
self.jobs.insert(id, job);
+ // PHP runs the resolver synchronously while constructing the Promise. For non-curl jobs
+ // the resolver performs the blocking RemoteFilesystem download and resolves immediately.
+ if !can_use_curl {
+ self.run_rfs_job(id);
+ }
+
if self.running_jobs < self.max_jobs {
self.start_job(id);
}
- Ok((JobHandle { id }, promise))
+ Ok(JobHandle { id })
+ }
+
+ /// Mirrors the non-curl branch of PHP `addJob`'s Promise resolver plus the `.then` side
+ /// effects: performs the blocking RemoteFilesystem download and settles the job.
+ fn run_rfs_job(&mut self, id: i64) {
+ let (request, origin, url, options, copy_to) = {
+ let job = self.jobs.get(&id).unwrap();
+ (
+ job.request.clone(),
+ job.origin.clone(),
+ job.request.url.clone(),
+ job.request.options.clone(),
+ job.request.copy_to.clone(),
+ )
+ };
+
+ if let Some(job) = self.jobs.get_mut(&id) {
+ job.status = Self::STATUS_STARTED;
+ }
+
+ let result: anyhow::Result<Response> = {
+ let rfs = self.rfs.as_mut().unwrap();
+ (|| -> anyhow::Result<Response> {
+ if let Some(copy_to) = copy_to.as_deref() {
+ rfs.copy(&origin, &url, copy_to, false, options.clone())?;
+
+ let headers = rfs.get_last_headers().to_vec();
+ let code = RemoteFilesystem::find_status_code(&headers);
+ let body = Some(format!("{}~", copy_to));
+ match Response::new(Self::request_to_map(&request), code, headers, body)? {
+ Ok(r) => Ok(r),
+ Err(e) => Err(e.into()),
+ }
+ } else {
+ let body = match rfs.get_contents(&origin, &url, false, options.clone())? {
+ GetResult::Content(s) => Some(s),
+ _ => None,
+ };
+ let headers = rfs.get_last_headers().to_vec();
+ let code = RemoteFilesystem::find_status_code(&headers);
+ match Response::new(Self::request_to_map(&request), code, headers, body)? {
+ Ok(r) => Ok(r),
+ Err(e) => Err(e.into()),
+ }
+ }
+ })()
+ };
+
+ self.settle_job(id, result);
+ }
+
+ /// PHP `new Response($job['request'], ...)` is fed the whole request array; reproduce it.
+ fn request_to_map(request: &Request) -> IndexMap<String, PhpMixed> {
+ let mut m: IndexMap<String, PhpMixed> = IndexMap::new();
+ m.insert("url".to_string(), PhpMixed::String(request.url.clone()));
+ m.insert(
+ "options".to_string(),
+ PhpMixed::Array(
+ request
+ .options
+ .iter()
+ .map(|(k, v)| (k.clone(), Box::new(v.clone())))
+ .collect(),
+ ),
+ );
+ m.insert(
+ "copyTo".to_string(),
+ match &request.copy_to {
+ Some(s) => PhpMixed::String(s.clone()),
+ None => PhpMixed::Null,
+ },
+ );
+ m
+ }
+
+ /// Applies the effect of PHP's promise `.then` handlers: records the response/exception,
+ /// transitions the job status and decrements the running-job counter.
+ fn settle_job(&mut self, id: i64, result: anyhow::Result<Response>) {
+ match result {
+ Ok(response) => {
+ if let Some(job) = self.jobs.get_mut(&id) {
+ job.status = Self::STATUS_COMPLETED;
+ job.response = Some(response);
+ }
+ }
+ Err(e) => {
+ if let Some(job) = self.jobs.get_mut(&id) {
+ job.status = Self::STATUS_FAILED;
+ job.exception = Some(e);
+ }
+ }
+ }
+ self.mark_job_done();
}
fn start_job(&mut self, id: i64) {
@@ -398,7 +475,6 @@ impl HttpDownloader {
};
let url = request.url.clone();
let options = request.options.clone();
- let _ = origin;
if self.disabled {
let has_if_modified_since = {
@@ -433,8 +509,13 @@ impl HttpDownloader {
if has_if_modified_since {
let mut req_map: IndexMap<String, PhpMixed> = IndexMap::new();
req_map.insert("url".to_string(), PhpMixed::String(url.clone()));
- let _ = Response::new(req_map, Some(304), Vec::new(), Some(String::new()));
- // job.resolve(response) — TODO(phase-b)
+ let response =
+ match Response::new(req_map, Some(304), Vec::new(), Some(String::new())) {
+ Ok(Ok(r)) => Ok(r),
+ Ok(Err(e)) => Err(e.into()),
+ Err(e) => Err(e),
+ };
+ self.settle_job(id, response);
} else {
let mut e = TransportException::new(
format!(
@@ -444,15 +525,45 @@ impl HttpDownloader {
499,
);
e.set_status_code(Some(499));
- // job.reject(e) — TODO(phase-b)
- let _ = e;
+ self.settle_job(id, Err(e.into()));
}
return;
}
- let _ = copy_to;
- // TODO(phase-b): try { curl->download(...) } catch (...) { reject(e) }
+ // curl branch: register the request with the curl multi handle. Completion is delivered
+ // asynchronously by curl.tick() into the job's `settled` slot (read by count_active_jobs).
+ // PHP catches any exception from download() and rejects the job.
+ let settled = self.jobs.get(&id).unwrap().settled.clone();
+ let settled_for_reject = settled.clone();
+ let resolve: Box<dyn Fn(PhpMixed) + Send + Sync> = Box::new(move |_response: PhpMixed| {
+ // TODO(phase-c-promise): curl.tick() delivers the response as PhpMixed here; convert it
+ // into a Response and store Ok(..) in `settled`. Bottoms at the todo!() curl I/O.
+ let _ = &settled;
+ });
+ let reject: Box<dyn Fn(PhpMixed) + Send + Sync> = Box::new(move |_error: PhpMixed| {
+ // TODO(phase-c-promise): convert the PhpMixed error into anyhow::Error and store
+ // Err(..) in `settled`. Bottoms at the todo!() curl I/O.
+ let _ = &settled_for_reject;
+ });
+
+ let download_result = {
+ let curl = self.curl.as_mut().unwrap();
+ curl.download(resolve, reject, &origin, &url, options, copy_to.as_deref())
+ };
+ match download_result {
+ Ok(Ok(curl_id)) => {
+ if let Some(job) = self.jobs.get_mut(&id) {
+ job.curl_id = Some(curl_id);
+ }
+ }
+ Ok(Err(e)) => {
+ self.settle_job(id, Err(e.into()));
+ }
+ Err(e) => {
+ self.settle_job(id, Err(e));
+ }
+ }
}
fn mark_job_done(&mut self) {
@@ -468,7 +579,7 @@ impl HttpDownloader {
fn wait_id(&mut self, index: Option<i64>) -> Result<()> {
loop {
- let job_count = self.count_active_jobs(index);
+ let job_count = self.count_active_jobs(index)?;
if job_count == 0 {
break;
}
@@ -482,7 +593,7 @@ impl HttpDownloader {
}
/// @internal
- pub fn count_active_jobs(&mut self, index: Option<i64>) -> i64 {
+ pub fn count_active_jobs(&mut self, index: Option<i64>) -> Result<i64> {
if self.running_jobs < self.max_jobs {
let queued_ids: Vec<i64> = self
.jobs
@@ -499,16 +610,32 @@ impl HttpDownloader {
}
if let Some(curl) = self.curl.as_mut() {
- curl.tick();
+ curl.tick()?;
+ }
+
+ // Apply completions delivered by curl.tick() into each started job's `settled` slot.
+ // This reproduces the effect of PHP's resolve/reject callbacks firing during tick().
+ let started_ids: Vec<i64> = self
+ .jobs
+ .values()
+ .filter(|j| j.status == Self::STATUS_STARTED)
+ .map(|j| j.id)
+ .collect();
+ for id in started_ids {
+ let settled = self.jobs.get(&id).unwrap().settled.lock().unwrap().take();
+ if let Some(result) = settled {
+ self.settle_job(id, result);
+ }
}
if let Some(index) = index {
- return if self.jobs.get(&index).map(|j| j.status).unwrap_or(0) < Self::STATUS_COMPLETED
- {
- 1
- } else {
- 0
- };
+ return Ok(
+ if self.jobs.get(&index).map(|j| j.status).unwrap_or(0) < Self::STATUS_COMPLETED {
+ 1
+ } else {
+ 0
+ },
+ );
}
let mut active: i64 = 0;
@@ -525,7 +652,7 @@ impl HttpDownloader {
}
}
- active
+ Ok(active)
}
/// @param int $index Job id