aboutsummaryrefslogtreecommitdiffhomepage
path: root/backend
diff options
context:
space:
mode:
authornsfisis <nsfisis@gmail.com>2024-08-08 00:46:45 +0900
committernsfisis <nsfisis@gmail.com>2024-08-08 04:02:37 +0900
commit339f99a26191ede59a9eb0de2819cb5efdeb1535 (patch)
tree37d9437c6e6865acfc5f6edd06f27b8be2a7b22b /backend
parentc173778cefd66e5259ad75c9399a95c03ce5fa91 (diff)
downloadiosdc-japan-2025-albatross-339f99a26191ede59a9eb0de2819cb5efdeb1535.tar.gz
iosdc-japan-2025-albatross-339f99a26191ede59a9eb0de2819cb5efdeb1535.tar.zst
iosdc-japan-2025-albatross-339f99a26191ede59a9eb0de2819cb5efdeb1535.zip
refactor(backend): wrap taskqueue.processor
Diffstat (limited to 'backend')
-rw-r--r--backend/taskqueue/processor.go168
-rw-r--r--backend/taskqueue/processor_wrapper.go110
-rw-r--r--backend/taskqueue/worker_server.go4
3 files changed, 185 insertions, 97 deletions
diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go
index 556bd78..d771e61 100644
--- a/backend/taskqueue/processor.go
+++ b/backend/taskqueue/processor.go
@@ -8,32 +8,24 @@ import (
"net/http"
"strings"
- "github.com/hibiken/asynq"
-
"github.com/nsfisis/iosdc-japan-2024-albatross/backend/db"
)
type processor struct {
- q *db.Queries
- results chan TaskResult
+ q *db.Queries
}
-func newProcessor(q *db.Queries) *processor {
- return &processor{
- q: q,
- results: make(chan TaskResult),
+func newProcessor(q *db.Queries) processor {
+ return processor{
+ q: q,
}
}
-func (p *processor) processTaskCreateSubmissionRecord(ctx context.Context, t *asynq.Task) error {
- var payload TaskPayloadCreateSubmissionRecord
- if err := json.Unmarshal(t.Payload(), &payload); err != nil {
- // TODO: send result
- return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
- }
-
+func (p *processor) doProcessTaskCreateSubmissionRecord(
+ ctx context.Context,
+ payload *TaskPayloadCreateSubmissionRecord,
+) (*TaskResultCreateSubmissionRecord, error) {
// TODO: upsert
- // Create submission record.
submissionID, err := p.q.CreateSubmission(ctx, db.CreateSubmissionParams{
GameID: int32(payload.GameID()),
UserID: int32(payload.UserID()),
@@ -41,24 +33,19 @@ func (p *processor) processTaskCreateSubmissionRecord(ctx context.Context, t *as
CodeSize: int32(payload.CodeSize),
})
if err != nil {
- // TODO: send result
- return fmt.Errorf("CreateSubmission failed: %v", err)
+ return nil, err
}
- p.results <- &TaskResultCreateSubmissionRecord{
- TaskPayload: &payload,
+ return &TaskResultCreateSubmissionRecord{
+ TaskPayload: payload,
SubmissionID: int(submissionID),
- }
- return nil
+ }, nil
}
-func (p *processor) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq.Task) error {
- var payload TaskPayloadCompileSwiftToWasm
- if err := json.Unmarshal(t.Payload(), &payload); err != nil {
- // TODO: send result
- return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
- }
-
+func (p *processor) doProcessTaskCompileSwiftToWasm(
+ ctx context.Context,
+ payload *TaskPayloadCompileSwiftToWasm,
+) (*TaskResultCompileSwiftToWasm, error) {
type swiftcRequestData struct {
MaxDuration int `json:"max_duration_ms"`
Code string `json:"code"`
@@ -74,18 +61,15 @@ func (p *processor) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq.
}
reqJson, err := json.Marshal(reqData)
if err != nil {
- // TODO: send result
- return fmt.Errorf("json.Marshal failed: %v", err)
+ return nil, fmt.Errorf("json.Marshal failed: %v", err)
}
res, err := http.Post("http://worker:80/api/swiftc", "application/json", bytes.NewBuffer(reqJson))
if err != nil {
- // TODO: send result
- return fmt.Errorf("http.Post failed: %v", err)
+ return nil, fmt.Errorf("http.Post failed: %v", err)
}
resData := swiftcResponseData{}
if err := json.NewDecoder(res.Body).Decode(&resData); err != nil {
- // TODO: send result
- return fmt.Errorf("json.Decode failed: %v", err)
+ return nil, fmt.Errorf("json.Decode failed: %v", err)
}
if resData.Status != "success" {
err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{
@@ -95,27 +79,28 @@ func (p *processor) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq.
Stderr: resData.Stderr,
})
if err != nil {
- // TODO: send result
- return fmt.Errorf("CreateTestcaseResult failed: %v", err)
+ return nil, fmt.Errorf("CreateTestcaseResult failed: %v", err)
}
- p.results <- &TaskResultCompileSwiftToWasm{
- TaskPayload: &payload,
+ return &TaskResultCompileSwiftToWasm{
+ TaskPayload: payload,
Status: "compile_error",
- }
- return fmt.Errorf("swiftc failed: %v", resData.Stderr)
+ Stdout: resData.Stdout,
+ Stderr: resData.Stderr,
+ }, nil
}
- // TODO: send result
- return nil
+ return &TaskResultCompileSwiftToWasm{
+ TaskPayload: payload,
+ Status: "success",
+ Stdout: resData.Stdout,
+ Stderr: resData.Stderr,
+ }, nil
}
-func (p *processor) processTaskCompileWasmToNativeExecutable(ctx context.Context, t *asynq.Task) error {
- var payload TaskPayloadCompileWasmToNativeExecutable
- if err := json.Unmarshal(t.Payload(), &payload); err != nil {
- // TODO: send result
- return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
- }
-
+func (p *processor) doProcessTaskCompileWasmToNativeExecutable(
+ ctx context.Context,
+ payload *TaskPayloadCompileWasmToNativeExecutable,
+) (*TaskResultCompileWasmToNativeExecutable, error) {
type wasmcRequestData struct {
MaxDuration int `json:"max_duration_ms"`
Code string `json:"code"`
@@ -131,18 +116,15 @@ func (p *processor) processTaskCompileWasmToNativeExecutable(ctx context.Context
}
reqJson, err := json.Marshal(reqData)
if err != nil {
- // TODO: send result
- return fmt.Errorf("json.Marshal failed: %v", err)
+ return nil, fmt.Errorf("json.Marshal failed: %v", err)
}
res, err := http.Post("http://worker:80/api/wasmc", "application/json", bytes.NewBuffer(reqJson))
if err != nil {
- // TODO: send result
- return fmt.Errorf("http.Post failed: %v", err)
+ return nil, fmt.Errorf("http.Post failed: %v", err)
}
resData := wasmcResponseData{}
if err := json.NewDecoder(res.Body).Decode(&resData); err != nil {
- // TODO: send result
- return fmt.Errorf("json.Decode failed: %v", err)
+ return nil, fmt.Errorf("json.Decode failed: %v", err)
}
if resData.Status != "success" {
err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{
@@ -152,31 +134,31 @@ func (p *processor) processTaskCompileWasmToNativeExecutable(ctx context.Context
Stderr: resData.Stderr,
})
if err != nil {
- // TODO: send result
- return fmt.Errorf("CreateTestcaseResult failed: %v", err)
+ return nil, fmt.Errorf("CreateTestcaseResult failed: %v", err)
}
- p.results <- &TaskResultCompileWasmToNativeExecutable{
- TaskPayload: &payload,
+ return &TaskResultCompileWasmToNativeExecutable{
+ TaskPayload: payload,
Status: "compile_error",
- }
- return fmt.Errorf("wasmc failed: %v", resData.Stderr)
+ Stdout: resData.Stdout,
+ Stderr: resData.Stderr,
+ }, nil
}
- // TODO: send result
- return nil
+ return &TaskResultCompileWasmToNativeExecutable{
+ TaskPayload: payload,
+ Status: "success",
+ Stdout: resData.Stdout,
+ Stderr: resData.Stderr,
+ }, nil
}
-func (p *processor) processTaskRunTestcase(ctx context.Context, t *asynq.Task) error {
- var payload TaskPayloadRunTestcase
- if err := json.Unmarshal(t.Payload(), &payload); err != nil {
- // TODO: send result
- return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
- }
-
+func (p *processor) doProcessTaskRunTestcase(
+ ctx context.Context,
+ payload *TaskPayloadRunTestcase,
+) (*TaskResultRunTestcase, error) {
testcases, err := p.q.ListTestcasesByGameID(ctx, int32(payload.GameID()))
if err != nil {
- // TODO: send result
- return fmt.Errorf("ListTestcasesByGameID failed: %v", err)
+ return nil, fmt.Errorf("ListTestcasesByGameID failed: %v", err)
}
for _, testcase := range testcases {
@@ -197,18 +179,15 @@ func (p *processor) processTaskRunTestcase(ctx context.Context, t *asynq.Task) e
}
reqJson, err := json.Marshal(reqData)
if err != nil {
- // TODO: send result
- return fmt.Errorf("json.Marshal failed: %v", err)
+ return nil, fmt.Errorf("json.Marshal failed: %v", err)
}
res, err := http.Post("http://worker:80/api/testrun", "application/json", bytes.NewBuffer(reqJson))
if err != nil {
- // TODO: send result
- return fmt.Errorf("http.Post failed: %v", err)
+ return nil, fmt.Errorf("http.Post failed: %v", err)
}
resData := testrunResponseData{}
if err := json.NewDecoder(res.Body).Decode(&resData); err != nil {
- // TODO: send result
- return fmt.Errorf("json.Decode failed: %v", err)
+ return nil, fmt.Errorf("json.Decode failed: %v", err)
}
if resData.Status != "success" {
err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{
@@ -219,14 +198,14 @@ func (p *processor) processTaskRunTestcase(ctx context.Context, t *asynq.Task) e
Stderr: resData.Stderr,
})
if err != nil {
- // TODO: send result
- return fmt.Errorf("CreateTestcaseResult failed: %v", err)
+ return nil, fmt.Errorf("CreateTestcaseResult failed: %v", err)
}
- p.results <- &TaskResultRunTestcase{
- TaskPayload: &payload,
+ return &TaskResultRunTestcase{
+ TaskPayload: payload,
Status: resData.Status,
- }
- return fmt.Errorf("testrun failed: %v", resData.Stderr)
+ Stdout: resData.Stdout,
+ Stderr: resData.Stderr,
+ }, nil
}
if !isTestcaseResultCorrect(testcase.Stdout, resData.Stdout) {
err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{
@@ -237,22 +216,21 @@ func (p *processor) processTaskRunTestcase(ctx context.Context, t *asynq.Task) e
Stderr: resData.Stderr,
})
if err != nil {
- // TODO: send result
- return fmt.Errorf("CreateTestcaseResult failed: %v", err)
+ return nil, fmt.Errorf("CreateTestcaseResult failed: %v", err)
}
- p.results <- &TaskResultRunTestcase{
- TaskPayload: &payload,
+ return &TaskResultRunTestcase{
+ TaskPayload: payload,
Status: "wrong_answer",
- }
- return fmt.Errorf("testrun failed: %v", resData.Stdout)
+ Stdout: resData.Stdout,
+ Stderr: resData.Stderr,
+ }, nil
}
}
- p.results <- &TaskResultRunTestcase{
- TaskPayload: &payload,
+ return &TaskResultRunTestcase{
+ TaskPayload: payload,
Status: "success",
- }
- return nil
+ }, nil
}
func isTestcaseResultCorrect(expectedStdout, actualStdout string) bool {
diff --git a/backend/taskqueue/processor_wrapper.go b/backend/taskqueue/processor_wrapper.go
new file mode 100644
index 0000000..1ae9f2f
--- /dev/null
+++ b/backend/taskqueue/processor_wrapper.go
@@ -0,0 +1,110 @@
+package taskqueue
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+
+ "github.com/hibiken/asynq"
+)
+
+type processorWrapper struct {
+ impl processor
+ results chan TaskResult
+}
+
+func newProcessorWrapper(impl processor) *processorWrapper {
+ return &processorWrapper{
+ impl: impl,
+ results: make(chan TaskResult),
+ }
+}
+
+func (p *processorWrapper) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq.Task) error {
+ var payload TaskPayloadCompileSwiftToWasm
+ if err := json.Unmarshal(t.Payload(), &payload); err != nil {
+ err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
+ p.results <- &TaskResultCompileSwiftToWasm{Err: err}
+ return err
+ }
+
+ result, err := p.impl.doProcessTaskCompileSwiftToWasm(ctx, &payload)
+ if err != nil {
+ retryCount, _ := asynq.GetRetryCount(ctx)
+ maxRetry, _ := asynq.GetMaxRetry(ctx)
+ isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry
+ if !isRecoverable {
+ p.results <- &TaskResultCompileSwiftToWasm{Err: err}
+ }
+ return err
+ }
+ p.results <- result
+ return nil
+}
+
+func (p *processorWrapper) processTaskCompileWasmToNativeExecutable(ctx context.Context, t *asynq.Task) error {
+ var payload TaskPayloadCompileWasmToNativeExecutable
+ if err := json.Unmarshal(t.Payload(), &payload); err != nil {
+ err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
+ p.results <- &TaskResultCompileWasmToNativeExecutable{Err: err}
+ return err
+ }
+
+ result, err := p.impl.doProcessTaskCompileWasmToNativeExecutable(ctx, &payload)
+ if err != nil {
+ retryCount, _ := asynq.GetRetryCount(ctx)
+ maxRetry, _ := asynq.GetMaxRetry(ctx)
+ isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry
+ if !isRecoverable {
+ p.results <- &TaskResultCompileWasmToNativeExecutable{Err: err}
+ }
+ return err
+ }
+ p.results <- result
+ return nil
+}
+
+func (p *processorWrapper) processTaskCreateSubmissionRecord(ctx context.Context, t *asynq.Task) error {
+ var payload TaskPayloadCreateSubmissionRecord
+ if err := json.Unmarshal(t.Payload(), &payload); err != nil {
+ err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
+ p.results <- &TaskResultCreateSubmissionRecord{Err: err}
+ return err
+ }
+
+ result, err := p.impl.doProcessTaskCreateSubmissionRecord(ctx, &payload)
+ if err != nil {
+ retryCount, _ := asynq.GetRetryCount(ctx)
+ maxRetry, _ := asynq.GetMaxRetry(ctx)
+ isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry
+ if !isRecoverable {
+ p.results <- &TaskResultCreateSubmissionRecord{Err: err}
+ }
+ return err
+ }
+ p.results <- result
+ return nil
+}
+
+func (p *processorWrapper) processTaskRunTestcase(ctx context.Context, t *asynq.Task) error {
+ var payload TaskPayloadRunTestcase
+ if err := json.Unmarshal(t.Payload(), &payload); err != nil {
+ err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
+ p.results <- &TaskResultRunTestcase{Err: err}
+ return err
+ }
+
+ result, err := p.impl.doProcessTaskRunTestcase(ctx, &payload)
+ if err != nil {
+ retryCount, _ := asynq.GetRetryCount(ctx)
+ maxRetry, _ := asynq.GetMaxRetry(ctx)
+ isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry
+ if !isRecoverable {
+ p.results <- &TaskResultRunTestcase{Err: err}
+ }
+ return err
+ }
+ p.results <- result
+ return nil
+}
diff --git a/backend/taskqueue/worker_server.go b/backend/taskqueue/worker_server.go
index 6dc65d8..317f61c 100644
--- a/backend/taskqueue/worker_server.go
+++ b/backend/taskqueue/worker_server.go
@@ -8,7 +8,7 @@ import (
type WorkerServer struct {
server *asynq.Server
- processor *processor
+ processor *processorWrapper
}
func NewWorkerServer(redisAddr string, queries *db.Queries) *WorkerServer {
@@ -18,7 +18,7 @@ func NewWorkerServer(redisAddr string, queries *db.Queries) *WorkerServer {
},
asynq.Config{},
)
- processor := newProcessor(queries)
+ processor := newProcessorWrapper(newProcessor(queries))
return &WorkerServer{
server: server,
processor: processor,