diff options
| -rw-r--r-- | backend/taskqueue/processor.go | 168 | ||||
| -rw-r--r-- | backend/taskqueue/processor_wrapper.go | 110 | ||||
| -rw-r--r-- | backend/taskqueue/worker_server.go | 4 |
3 files changed, 185 insertions, 97 deletions
diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go index 556bd78..d771e61 100644 --- a/backend/taskqueue/processor.go +++ b/backend/taskqueue/processor.go @@ -8,32 +8,24 @@ import ( "net/http" "strings" - "github.com/hibiken/asynq" - "github.com/nsfisis/iosdc-japan-2024-albatross/backend/db" ) type processor struct { - q *db.Queries - results chan TaskResult + q *db.Queries } -func newProcessor(q *db.Queries) *processor { - return &processor{ - q: q, - results: make(chan TaskResult), +func newProcessor(q *db.Queries) processor { + return processor{ + q: q, } } -func (p *processor) processTaskCreateSubmissionRecord(ctx context.Context, t *asynq.Task) error { - var payload TaskPayloadCreateSubmissionRecord - if err := json.Unmarshal(t.Payload(), &payload); err != nil { - // TODO: send result - return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) - } - +func (p *processor) doProcessTaskCreateSubmissionRecord( + ctx context.Context, + payload *TaskPayloadCreateSubmissionRecord, +) (*TaskResultCreateSubmissionRecord, error) { // TODO: upsert - // Create submission record. submissionID, err := p.q.CreateSubmission(ctx, db.CreateSubmissionParams{ GameID: int32(payload.GameID()), UserID: int32(payload.UserID()), @@ -41,24 +33,19 @@ func (p *processor) processTaskCreateSubmissionRecord(ctx context.Context, t *as CodeSize: int32(payload.CodeSize), }) if err != nil { - // TODO: send result - return fmt.Errorf("CreateSubmission failed: %v", err) + return nil, err } - p.results <- &TaskResultCreateSubmissionRecord{ - TaskPayload: &payload, + return &TaskResultCreateSubmissionRecord{ + TaskPayload: payload, SubmissionID: int(submissionID), - } - return nil + }, nil } -func (p *processor) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq.Task) error { - var payload TaskPayloadCompileSwiftToWasm - if err := json.Unmarshal(t.Payload(), &payload); err != nil { - // TODO: send result - return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) - } - +func (p *processor) doProcessTaskCompileSwiftToWasm( + ctx context.Context, + payload *TaskPayloadCompileSwiftToWasm, +) (*TaskResultCompileSwiftToWasm, error) { type swiftcRequestData struct { MaxDuration int `json:"max_duration_ms"` Code string `json:"code"` @@ -74,18 +61,15 @@ func (p *processor) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq. } reqJson, err := json.Marshal(reqData) if err != nil { - // TODO: send result - return fmt.Errorf("json.Marshal failed: %v", err) + return nil, fmt.Errorf("json.Marshal failed: %v", err) } res, err := http.Post("http://worker:80/api/swiftc", "application/json", bytes.NewBuffer(reqJson)) if err != nil { - // TODO: send result - return fmt.Errorf("http.Post failed: %v", err) + return nil, fmt.Errorf("http.Post failed: %v", err) } resData := swiftcResponseData{} if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { - // TODO: send result - return fmt.Errorf("json.Decode failed: %v", err) + return nil, fmt.Errorf("json.Decode failed: %v", err) } if resData.Status != "success" { err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{ @@ -95,27 +79,28 @@ func (p *processor) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq. Stderr: resData.Stderr, }) if err != nil { - // TODO: send result - return fmt.Errorf("CreateTestcaseResult failed: %v", err) + return nil, fmt.Errorf("CreateTestcaseResult failed: %v", err) } - p.results <- &TaskResultCompileSwiftToWasm{ - TaskPayload: &payload, + return &TaskResultCompileSwiftToWasm{ + TaskPayload: payload, Status: "compile_error", - } - return fmt.Errorf("swiftc failed: %v", resData.Stderr) + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }, nil } - // TODO: send result - return nil + return &TaskResultCompileSwiftToWasm{ + TaskPayload: payload, + Status: "success", + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }, nil } -func (p *processor) processTaskCompileWasmToNativeExecutable(ctx context.Context, t *asynq.Task) error { - var payload TaskPayloadCompileWasmToNativeExecutable - if err := json.Unmarshal(t.Payload(), &payload); err != nil { - // TODO: send result - return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) - } - +func (p *processor) doProcessTaskCompileWasmToNativeExecutable( + ctx context.Context, + payload *TaskPayloadCompileWasmToNativeExecutable, +) (*TaskResultCompileWasmToNativeExecutable, error) { type wasmcRequestData struct { MaxDuration int `json:"max_duration_ms"` Code string `json:"code"` @@ -131,18 +116,15 @@ func (p *processor) processTaskCompileWasmToNativeExecutable(ctx context.Context } reqJson, err := json.Marshal(reqData) if err != nil { - // TODO: send result - return fmt.Errorf("json.Marshal failed: %v", err) + return nil, fmt.Errorf("json.Marshal failed: %v", err) } res, err := http.Post("http://worker:80/api/wasmc", "application/json", bytes.NewBuffer(reqJson)) if err != nil { - // TODO: send result - return fmt.Errorf("http.Post failed: %v", err) + return nil, fmt.Errorf("http.Post failed: %v", err) } resData := wasmcResponseData{} if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { - // TODO: send result - return fmt.Errorf("json.Decode failed: %v", err) + return nil, fmt.Errorf("json.Decode failed: %v", err) } if resData.Status != "success" { err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{ @@ -152,31 +134,31 @@ func (p *processor) processTaskCompileWasmToNativeExecutable(ctx context.Context Stderr: resData.Stderr, }) if err != nil { - // TODO: send result - return fmt.Errorf("CreateTestcaseResult failed: %v", err) + return nil, fmt.Errorf("CreateTestcaseResult failed: %v", err) } - p.results <- &TaskResultCompileWasmToNativeExecutable{ - TaskPayload: &payload, + return &TaskResultCompileWasmToNativeExecutable{ + TaskPayload: payload, Status: "compile_error", - } - return fmt.Errorf("wasmc failed: %v", resData.Stderr) + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }, nil } - // TODO: send result - return nil + return &TaskResultCompileWasmToNativeExecutable{ + TaskPayload: payload, + Status: "success", + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }, nil } -func (p *processor) processTaskRunTestcase(ctx context.Context, t *asynq.Task) error { - var payload TaskPayloadRunTestcase - if err := json.Unmarshal(t.Payload(), &payload); err != nil { - // TODO: send result - return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) - } - +func (p *processor) doProcessTaskRunTestcase( + ctx context.Context, + payload *TaskPayloadRunTestcase, +) (*TaskResultRunTestcase, error) { testcases, err := p.q.ListTestcasesByGameID(ctx, int32(payload.GameID())) if err != nil { - // TODO: send result - return fmt.Errorf("ListTestcasesByGameID failed: %v", err) + return nil, fmt.Errorf("ListTestcasesByGameID failed: %v", err) } for _, testcase := range testcases { @@ -197,18 +179,15 @@ func (p *processor) processTaskRunTestcase(ctx context.Context, t *asynq.Task) e } reqJson, err := json.Marshal(reqData) if err != nil { - // TODO: send result - return fmt.Errorf("json.Marshal failed: %v", err) + return nil, fmt.Errorf("json.Marshal failed: %v", err) } res, err := http.Post("http://worker:80/api/testrun", "application/json", bytes.NewBuffer(reqJson)) if err != nil { - // TODO: send result - return fmt.Errorf("http.Post failed: %v", err) + return nil, fmt.Errorf("http.Post failed: %v", err) } resData := testrunResponseData{} if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { - // TODO: send result - return fmt.Errorf("json.Decode failed: %v", err) + return nil, fmt.Errorf("json.Decode failed: %v", err) } if resData.Status != "success" { err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ @@ -219,14 +198,14 @@ func (p *processor) processTaskRunTestcase(ctx context.Context, t *asynq.Task) e Stderr: resData.Stderr, }) if err != nil { - // TODO: send result - return fmt.Errorf("CreateTestcaseResult failed: %v", err) + return nil, fmt.Errorf("CreateTestcaseResult failed: %v", err) } - p.results <- &TaskResultRunTestcase{ - TaskPayload: &payload, + return &TaskResultRunTestcase{ + TaskPayload: payload, Status: resData.Status, - } - return fmt.Errorf("testrun failed: %v", resData.Stderr) + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }, nil } if !isTestcaseResultCorrect(testcase.Stdout, resData.Stdout) { err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ @@ -237,22 +216,21 @@ func (p *processor) processTaskRunTestcase(ctx context.Context, t *asynq.Task) e Stderr: resData.Stderr, }) if err != nil { - // TODO: send result - return fmt.Errorf("CreateTestcaseResult failed: %v", err) + return nil, fmt.Errorf("CreateTestcaseResult failed: %v", err) } - p.results <- &TaskResultRunTestcase{ - TaskPayload: &payload, + return &TaskResultRunTestcase{ + TaskPayload: payload, Status: "wrong_answer", - } - return fmt.Errorf("testrun failed: %v", resData.Stdout) + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }, nil } } - p.results <- &TaskResultRunTestcase{ - TaskPayload: &payload, + return &TaskResultRunTestcase{ + TaskPayload: payload, Status: "success", - } - return nil + }, nil } func isTestcaseResultCorrect(expectedStdout, actualStdout string) bool { diff --git a/backend/taskqueue/processor_wrapper.go b/backend/taskqueue/processor_wrapper.go new file mode 100644 index 0000000..1ae9f2f --- /dev/null +++ b/backend/taskqueue/processor_wrapper.go @@ -0,0 +1,110 @@ +package taskqueue + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/hibiken/asynq" +) + +type processorWrapper struct { + impl processor + results chan TaskResult +} + +func newProcessorWrapper(impl processor) *processorWrapper { + return &processorWrapper{ + impl: impl, + results: make(chan TaskResult), + } +} + +func (p *processorWrapper) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadCompileSwiftToWasm + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + p.results <- &TaskResultCompileSwiftToWasm{Err: err} + return err + } + + result, err := p.impl.doProcessTaskCompileSwiftToWasm(ctx, &payload) + if err != nil { + retryCount, _ := asynq.GetRetryCount(ctx) + maxRetry, _ := asynq.GetMaxRetry(ctx) + isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry + if !isRecoverable { + p.results <- &TaskResultCompileSwiftToWasm{Err: err} + } + return err + } + p.results <- result + return nil +} + +func (p *processorWrapper) processTaskCompileWasmToNativeExecutable(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadCompileWasmToNativeExecutable + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + p.results <- &TaskResultCompileWasmToNativeExecutable{Err: err} + return err + } + + result, err := p.impl.doProcessTaskCompileWasmToNativeExecutable(ctx, &payload) + if err != nil { + retryCount, _ := asynq.GetRetryCount(ctx) + maxRetry, _ := asynq.GetMaxRetry(ctx) + isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry + if !isRecoverable { + p.results <- &TaskResultCompileWasmToNativeExecutable{Err: err} + } + return err + } + p.results <- result + return nil +} + +func (p *processorWrapper) processTaskCreateSubmissionRecord(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadCreateSubmissionRecord + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + p.results <- &TaskResultCreateSubmissionRecord{Err: err} + return err + } + + result, err := p.impl.doProcessTaskCreateSubmissionRecord(ctx, &payload) + if err != nil { + retryCount, _ := asynq.GetRetryCount(ctx) + maxRetry, _ := asynq.GetMaxRetry(ctx) + isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry + if !isRecoverable { + p.results <- &TaskResultCreateSubmissionRecord{Err: err} + } + return err + } + p.results <- result + return nil +} + +func (p *processorWrapper) processTaskRunTestcase(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadRunTestcase + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + p.results <- &TaskResultRunTestcase{Err: err} + return err + } + + result, err := p.impl.doProcessTaskRunTestcase(ctx, &payload) + if err != nil { + retryCount, _ := asynq.GetRetryCount(ctx) + maxRetry, _ := asynq.GetMaxRetry(ctx) + isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry + if !isRecoverable { + p.results <- &TaskResultRunTestcase{Err: err} + } + return err + } + p.results <- result + return nil +} diff --git a/backend/taskqueue/worker_server.go b/backend/taskqueue/worker_server.go index 6dc65d8..317f61c 100644 --- a/backend/taskqueue/worker_server.go +++ b/backend/taskqueue/worker_server.go @@ -8,7 +8,7 @@ import ( type WorkerServer struct { server *asynq.Server - processor *processor + processor *processorWrapper } func NewWorkerServer(redisAddr string, queries *db.Queries) *WorkerServer { @@ -18,7 +18,7 @@ func NewWorkerServer(redisAddr string, queries *db.Queries) *WorkerServer { }, asynq.Config{}, ) - processor := newProcessor(queries) + processor := newProcessorWrapper(newProcessor(queries)) return &WorkerServer{ server: server, processor: processor, |
