aboutsummaryrefslogtreecommitdiffhomepage
path: root/crates/shirabe/src/util/loop.rs
blob: 9ffee8fa2d1e4e1f1ef06ae99e1f45ee5caf43ac (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
//! ref: composer/src/Composer/Util/Loop.php

use crate::util::http_downloader::HttpDownloader;
use crate::util::process_executor::ProcessExecutor;
use anyhow::Result;
use indexmap::IndexMap;
use shirabe_external_packages::react::promise::promise_interface::PromiseInterface;
use shirabe_external_packages::symfony::component::console::helper::progress_bar::ProgressBar;
use shirabe_php_shim::microtime;

#[derive(Debug)]
pub struct Loop {
    http_downloader: std::rc::Rc<std::cell::RefCell<HttpDownloader>>,
    process_executor: Option<std::rc::Rc<std::cell::RefCell<ProcessExecutor>>>,
    current_promises: IndexMap<i64, Vec<Box<dyn PromiseInterface>>>,
    wait_index: i64,
}

impl Loop {
    pub fn new(
        http_downloader: std::rc::Rc<std::cell::RefCell<HttpDownloader>>,
        process_executor: Option<std::rc::Rc<std::cell::RefCell<ProcessExecutor>>>,
    ) -> Self {
        http_downloader.borrow_mut().enable_async();

        let process_executor = process_executor.map(|pe| {
            pe.borrow_mut().enable_async();
            pe
        });

        Self {
            http_downloader,
            process_executor,
            current_promises: IndexMap::new(),
            wait_index: 0,
        }
    }

    pub fn get_http_downloader(&self) -> &std::rc::Rc<std::cell::RefCell<HttpDownloader>> {
        &self.http_downloader
    }

    pub fn get_process_executor(
        &self,
    ) -> Option<&std::rc::Rc<std::cell::RefCell<ProcessExecutor>>> {
        self.process_executor.as_ref()
    }

    pub fn wait(
        &mut self,
        promises: Vec<Box<dyn PromiseInterface>>,
        progress: Option<&mut ProgressBar>,
    ) -> Result<()> {
        let mut uncaught: Option<anyhow::Error> = None;

        shirabe_external_packages::react::promise::all(&promises).then(
            || {},
            |e: anyhow::Error| {
                uncaught = Some(e);
            },
        );

        // keep track of every group of promises that is waited on, so abortJobs can
        // cancel them all, even if wait() was called within a wait()
        let wait_index = self.wait_index;
        self.wait_index += 1;
        self.current_promises.insert(wait_index, promises);

        if let Some(ref progress) = progress {
            let mut total_jobs: i64 = 0;
            total_jobs += self.http_downloader.borrow_mut().count_active_jobs(None);
            if let Some(ref pe) = self.process_executor {
                total_jobs += pe.borrow_mut().count_active_jobs(None);
            }
            progress.start(total_jobs);
        }

        let mut last_update: f64 = 0.0;
        loop {
            let mut active_jobs: i64 = 0;

            active_jobs += self.http_downloader.borrow_mut().count_active_jobs(None);
            if let Some(ref pe) = self.process_executor {
                active_jobs += pe.borrow_mut().count_active_jobs(None);
            }

            if let Some(ref progress) = progress {
                if microtime(true) - last_update > 0.1 {
                    last_update = microtime(true);
                    progress.set_progress(progress.get_max_steps() - active_jobs);
                }
            }

            if active_jobs == 0 {
                break;
            }
        }

        // as we skip progress updates if they are too quick, make sure we do one last one here at 100%
        if let Some(ref progress) = progress {
            progress.finish();
        }

        self.current_promises.remove(&wait_index);
        if let Some(e) = uncaught {
            return Err(e);
        }

        Ok(())
    }

    pub fn abort_jobs(&self) {
        for promise_group in self.current_promises.values() {
            for promise in promise_group {
                // to support react/promise 2.x we wrap the promise in a resolve() call for safety
                shirabe_external_packages::react::promise::resolve(Some(promise)).cancel();
            }
        }
    }
}