aboutsummaryrefslogtreecommitdiffhomepage
path: root/backend/taskqueue/processor_wrapper.go
diff options
context:
space:
mode:
authornsfisis <nsfisis@gmail.com>2024-08-08 00:46:45 +0900
committernsfisis <nsfisis@gmail.com>2024-08-08 04:02:37 +0900
commit339f99a26191ede59a9eb0de2819cb5efdeb1535 (patch)
tree37d9437c6e6865acfc5f6edd06f27b8be2a7b22b /backend/taskqueue/processor_wrapper.go
parentc173778cefd66e5259ad75c9399a95c03ce5fa91 (diff)
downloadiosdc-japan-2024-albatross-339f99a26191ede59a9eb0de2819cb5efdeb1535.tar.gz
iosdc-japan-2024-albatross-339f99a26191ede59a9eb0de2819cb5efdeb1535.tar.zst
iosdc-japan-2024-albatross-339f99a26191ede59a9eb0de2819cb5efdeb1535.zip
refactor(backend): wrap taskqueue.processor
Diffstat (limited to 'backend/taskqueue/processor_wrapper.go')
-rw-r--r--backend/taskqueue/processor_wrapper.go110
1 files changed, 110 insertions, 0 deletions
diff --git a/backend/taskqueue/processor_wrapper.go b/backend/taskqueue/processor_wrapper.go
new file mode 100644
index 0000000..1ae9f2f
--- /dev/null
+++ b/backend/taskqueue/processor_wrapper.go
@@ -0,0 +1,110 @@
+package taskqueue
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+
+ "github.com/hibiken/asynq"
+)
+
+type processorWrapper struct {
+ impl processor
+ results chan TaskResult
+}
+
+func newProcessorWrapper(impl processor) *processorWrapper {
+ return &processorWrapper{
+ impl: impl,
+ results: make(chan TaskResult),
+ }
+}
+
+func (p *processorWrapper) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq.Task) error {
+ var payload TaskPayloadCompileSwiftToWasm
+ if err := json.Unmarshal(t.Payload(), &payload); err != nil {
+ err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
+ p.results <- &TaskResultCompileSwiftToWasm{Err: err}
+ return err
+ }
+
+ result, err := p.impl.doProcessTaskCompileSwiftToWasm(ctx, &payload)
+ if err != nil {
+ retryCount, _ := asynq.GetRetryCount(ctx)
+ maxRetry, _ := asynq.GetMaxRetry(ctx)
+ isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry
+ if !isRecoverable {
+ p.results <- &TaskResultCompileSwiftToWasm{Err: err}
+ }
+ return err
+ }
+ p.results <- result
+ return nil
+}
+
+func (p *processorWrapper) processTaskCompileWasmToNativeExecutable(ctx context.Context, t *asynq.Task) error {
+ var payload TaskPayloadCompileWasmToNativeExecutable
+ if err := json.Unmarshal(t.Payload(), &payload); err != nil {
+ err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
+ p.results <- &TaskResultCompileWasmToNativeExecutable{Err: err}
+ return err
+ }
+
+ result, err := p.impl.doProcessTaskCompileWasmToNativeExecutable(ctx, &payload)
+ if err != nil {
+ retryCount, _ := asynq.GetRetryCount(ctx)
+ maxRetry, _ := asynq.GetMaxRetry(ctx)
+ isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry
+ if !isRecoverable {
+ p.results <- &TaskResultCompileWasmToNativeExecutable{Err: err}
+ }
+ return err
+ }
+ p.results <- result
+ return nil
+}
+
+func (p *processorWrapper) processTaskCreateSubmissionRecord(ctx context.Context, t *asynq.Task) error {
+ var payload TaskPayloadCreateSubmissionRecord
+ if err := json.Unmarshal(t.Payload(), &payload); err != nil {
+ err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
+ p.results <- &TaskResultCreateSubmissionRecord{Err: err}
+ return err
+ }
+
+ result, err := p.impl.doProcessTaskCreateSubmissionRecord(ctx, &payload)
+ if err != nil {
+ retryCount, _ := asynq.GetRetryCount(ctx)
+ maxRetry, _ := asynq.GetMaxRetry(ctx)
+ isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry
+ if !isRecoverable {
+ p.results <- &TaskResultCreateSubmissionRecord{Err: err}
+ }
+ return err
+ }
+ p.results <- result
+ return nil
+}
+
+func (p *processorWrapper) processTaskRunTestcase(ctx context.Context, t *asynq.Task) error {
+ var payload TaskPayloadRunTestcase
+ if err := json.Unmarshal(t.Payload(), &payload); err != nil {
+ err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
+ p.results <- &TaskResultRunTestcase{Err: err}
+ return err
+ }
+
+ result, err := p.impl.doProcessTaskRunTestcase(ctx, &payload)
+ if err != nil {
+ retryCount, _ := asynq.GetRetryCount(ctx)
+ maxRetry, _ := asynq.GetMaxRetry(ctx)
+ isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry
+ if !isRecoverable {
+ p.results <- &TaskResultRunTestcase{Err: err}
+ }
+ return err
+ }
+ p.results <- result
+ return nil
+}