diff options
| author | nsfisis <nsfisis@gmail.com> | 2024-08-08 19:30:20 +0900 |
|---|---|---|
| committer | nsfisis <nsfisis@gmail.com> | 2024-08-08 19:30:20 +0900 |
| commit | 9e5500269746d3826382a6dec78f0e82cfda0d42 (patch) | |
| tree | d8f15d4081132d6cfff3ea3b02f197b2d6bad700 | |
| parent | 401a28944fc0408811aedadd1c3104e2e2d4d7fe (diff) | |
| parent | 113c83b19acc58fbd46e8acdac67ff1a112d0d8c (diff) | |
| download | phperkaigi-2025-albatross-9e5500269746d3826382a6dec78f0e82cfda0d42.tar.gz phperkaigi-2025-albatross-9e5500269746d3826382a6dec78f0e82cfda0d42.tar.zst phperkaigi-2025-albatross-9e5500269746d3826382a6dec78f0e82cfda0d42.zip | |
Merge branch 'feat/taskqueue'
| -rw-r--r-- | backend/db/models.go | 24 | ||||
| -rw-r--r-- | backend/db/query.sql.go | 56 | ||||
| -rw-r--r-- | backend/fixtures/dev.sql | 1 | ||||
| -rw-r--r-- | backend/game/hub.go | 352 | ||||
| -rw-r--r-- | backend/gen/api/handler_wrapper_gen.go (renamed from backend/gen/api_handler_wrapper_gen.go) | 0 | ||||
| -rw-r--r-- | backend/gen/gen.go | 5 | ||||
| -rw-r--r-- | backend/gen/taskqueue/processor_wrapper_gen.go | 132 | ||||
| -rw-r--r-- | backend/main.go | 6 | ||||
| -rw-r--r-- | backend/query.sql | 22 | ||||
| -rw-r--r-- | backend/schema.sql | 31 | ||||
| -rw-r--r-- | backend/taskqueue/processor.go | 289 | ||||
| -rw-r--r-- | backend/taskqueue/processor_wrapper.go | 112 | ||||
| -rw-r--r-- | backend/taskqueue/queue.go | 82 | ||||
| -rw-r--r-- | backend/taskqueue/tasks.go | 183 | ||||
| -rw-r--r-- | backend/taskqueue/worker_server.go | 34 | ||||
| -rw-r--r-- | worker/exec.go | 10 | ||||
| -rw-r--r-- | worker/models.go | 8 |
17 files changed, 1092 insertions, 255 deletions
diff --git a/backend/db/models.go b/backend/db/models.go index 431d860..d4cf98b 100644 --- a/backend/db/models.go +++ b/backend/db/models.go @@ -39,6 +39,15 @@ type Submission struct { CreatedAt pgtype.Timestamp } +type SubmissionResult struct { + SubmissionResultID int32 + SubmissionID int32 + Status string + Stdout string + Stderr string + CreatedAt pgtype.Timestamp +} + type Testcase struct { TestcaseID int32 ProblemID int32 @@ -46,13 +55,14 @@ type Testcase struct { Stdout string } -type TestcaseExecution struct { - TestcaseExecutionID int32 - SubmissionID int32 - TestcaseID *int32 - Status string - Stdout string - Stderr string +type TestcaseResult struct { + TestcaseResultID int32 + SubmissionID int32 + TestcaseID int32 + Status string + Stdout string + Stderr string + CreatedAt pgtype.Timestamp } type User struct { diff --git a/backend/db/query.sql.go b/backend/db/query.sql.go index 18acfda..5475067 100644 --- a/backend/db/query.sql.go +++ b/backend/db/query.sql.go @@ -11,6 +11,28 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) +const aggregateTestcaseResults = `-- name: AggregateTestcaseResults :one +SELECT + CASE + WHEN COUNT(CASE WHEN r.status IS NULL THEN 1 END) > 0 THEN 'running' + WHEN COUNT(CASE WHEN r.status = 'internal_error' THEN 1 END) > 0 THEN 'internal_error' + WHEN COUNT(CASE WHEN r.status = 'timeout' THEN 1 END) > 0 THEN 'timeout' + WHEN COUNT(CASE WHEN r.status = 'runtime_error' THEN 1 END) > 0 THEN 'runtime_error' + WHEN COUNT(CASE WHEN r.status = 'wrong_answer' THEN 1 END) > 0 THEN 'wrong_answer' + ELSE 'success' + END AS status +FROM testcases +LEFT JOIN testcase_results AS r ON testcases.testcase_id = r.testcase_id +WHERE r.submission_id = $1 +` + +func (q *Queries) AggregateTestcaseResults(ctx context.Context, submissionID int32) (string, error) { + row := q.db.QueryRow(ctx, aggregateTestcaseResults, submissionID) + var status string + err := row.Scan(&status) + return status, err +} + const createSubmission = `-- name: CreateSubmission :one INSERT INTO submissions (game_id, user_id, code, code_size) VALUES ($1, $2, $3, $4) @@ -36,21 +58,43 @@ func (q *Queries) CreateSubmission(ctx context.Context, arg CreateSubmissionPara return submission_id, err } -const createTestcaseExecution = `-- name: CreateTestcaseExecution :exec -INSERT INTO testcase_executions (submission_id, testcase_id, status, stdout, stderr) +const createSubmissionResult = `-- name: CreateSubmissionResult :exec +INSERT INTO submission_results (submission_id, status, stdout, stderr) +VALUES ($1, $2, $3, $4) +` + +type CreateSubmissionResultParams struct { + SubmissionID int32 + Status string + Stdout string + Stderr string +} + +func (q *Queries) CreateSubmissionResult(ctx context.Context, arg CreateSubmissionResultParams) error { + _, err := q.db.Exec(ctx, createSubmissionResult, + arg.SubmissionID, + arg.Status, + arg.Stdout, + arg.Stderr, + ) + return err +} + +const createTestcaseResult = `-- name: CreateTestcaseResult :exec +INSERT INTO testcase_results (submission_id, testcase_id, status, stdout, stderr) VALUES ($1, $2, $3, $4, $5) ` -type CreateTestcaseExecutionParams struct { +type CreateTestcaseResultParams struct { SubmissionID int32 - TestcaseID *int32 + TestcaseID int32 Status string Stdout string Stderr string } -func (q *Queries) CreateTestcaseExecution(ctx context.Context, arg CreateTestcaseExecutionParams) error { - _, err := q.db.Exec(ctx, createTestcaseExecution, +func (q *Queries) CreateTestcaseResult(ctx context.Context, arg CreateTestcaseResultParams) error { + _, err := q.db.Exec(ctx, createTestcaseResult, arg.SubmissionID, arg.TestcaseID, arg.Status, diff --git a/backend/fixtures/dev.sql b/backend/fixtures/dev.sql index 89f709a..2daa8f5 100644 --- a/backend/fixtures/dev.sql +++ b/backend/fixtures/dev.sql @@ -54,5 +54,6 @@ VALUES INSERT INTO testcases (problem_id, stdin, stdout) VALUES + (1, '', '42'), (4, '', '42'), (7, '', '42'); diff --git a/backend/game/hub.go b/backend/game/hub.go index 719b216..11a466b 100644 --- a/backend/game/hub.go +++ b/backend/game/hub.go @@ -4,6 +4,7 @@ import ( "context" "errors" "log" + "strings" "time" "github.com/jackc/pgx/v5/pgtype" @@ -33,7 +34,7 @@ type gameHub struct { watchers map[*watcherClient]bool registerWatcher chan *watcherClient unregisterWatcher chan *watcherClient - testcaseExecution chan string + taskResults chan taskqueue.TaskResult } func newGameHub(ctx context.Context, game *game, q *db.Queries, taskQueue *taskqueue.Queue) *gameHub { @@ -49,7 +50,7 @@ func newGameHub(ctx context.Context, game *game, q *db.Queries, taskQueue *taskq watchers: make(map[*watcherClient]bool), registerWatcher: make(chan *watcherClient), unregisterWatcher: make(chan *watcherClient), - testcaseExecution: make(chan string), + taskResults: make(chan taskqueue.TaskResult), } } @@ -161,26 +162,19 @@ func (hub *gameHub) run() { // TODO: assert game state is gaming log.Printf("submit: %v", message.message) code := msg.Data.Code - task, err := taskqueue.NewExecTask(hub.game.gameID, message.client.playerID, code) - if err != nil { - log.Fatalf("failed to create task: %v", err) + codeSize := len(code) // TODO: exclude whitespaces. + if err := hub.taskQueue.EnqueueTaskCreateSubmissionRecord( + hub.game.gameID, + message.client.playerID, + code, + codeSize, + ); err != nil { + // TODO: notify failure to player + log.Fatalf("failed to enqueue task: %v", err) } - hub.taskQueue.Enqueue(task) default: log.Printf("unexpected message type: %T", message.message) } - case executionStatus := <-hub.testcaseExecution: - _ = executionStatus - for player := range hub.players { - player.s2cMessages <- &playerMessageS2CExecResult{ - Type: playerMessageTypeS2CExecResult, - Data: playerMessageS2CExecResultPayload{ - Score: nil, - Status: api.GamePlayerMessageS2CExecResultPayloadStatus(executionStatus), - }, - } - } - // broadcast to watchers case <-ticker.C: if hub.game.state == gameStateStarting { if time.Now().After(*hub.game.startedAt) { @@ -211,6 +205,296 @@ func (hub *gameHub) run() { } } +type codeSubmissionError struct { + Status string + Stdout string + Stderr string +} + +func (err *codeSubmissionError) Error() string { + return err.Stderr +} + +func (hub *gameHub) processTaskResults() { + for taskResult := range hub.taskResults { + switch taskResult := taskResult.(type) { + case *taskqueue.TaskResultCreateSubmissionRecord: + err := hub.processTaskResultCreateSubmissionRecord(taskResult) + if err != nil { + for player := range hub.players { + if player.playerID != taskResult.TaskPayload.UserID() { + continue + } + player.s2cMessages <- &playerMessageS2CExecResult{ + Type: playerMessageTypeS2CExecResult, + Data: playerMessageS2CExecResultPayload{ + Score: nil, + Status: api.GamePlayerMessageS2CExecResultPayloadStatus(err.Status), + }, + } + } + // TODO: broadcast to watchers + } + case *taskqueue.TaskResultCompileSwiftToWasm: + err := hub.processTaskResultCompileSwiftToWasm(taskResult) + if err != nil { + for player := range hub.players { + if player.playerID != taskResult.TaskPayload.UserID() { + continue + } + player.s2cMessages <- &playerMessageS2CExecResult{ + Type: playerMessageTypeS2CExecResult, + Data: playerMessageS2CExecResultPayload{ + Score: nil, + Status: api.GamePlayerMessageS2CExecResultPayloadStatus(err.Status), + }, + } + } + // TODO: broadcast to watchers + } + case *taskqueue.TaskResultCompileWasmToNativeExecutable: + err := hub.processTaskResultCompileWasmToNativeExecutable(taskResult) + if err != nil { + for player := range hub.players { + if player.playerID != taskResult.TaskPayload.UserID() { + continue + } + player.s2cMessages <- &playerMessageS2CExecResult{ + Type: playerMessageTypeS2CExecResult, + Data: playerMessageS2CExecResultPayload{ + Score: nil, + Status: api.GamePlayerMessageS2CExecResultPayloadStatus(err.Status), + }, + } + } + // TODO: broadcast to watchers + } + case *taskqueue.TaskResultRunTestcase: + var err error + err = hub.processTaskResultRunTestcase(taskResult) + _ = err // TODO: handle err? + aggregatedStatus, err := hub.q.AggregateTestcaseResults(hub.ctx, int32(taskResult.TaskPayload.SubmissionID)) + _ = err // TODO: handle err? + err = hub.q.CreateSubmissionResult(hub.ctx, db.CreateSubmissionResultParams{ + SubmissionID: int32(taskResult.TaskPayload.SubmissionID), + Status: aggregatedStatus, + Stdout: "", + Stderr: "", + }) + if err != nil { + for player := range hub.players { + if player.playerID != taskResult.TaskPayload.UserID() { + continue + } + player.s2cMessages <- &playerMessageS2CExecResult{ + Type: playerMessageTypeS2CExecResult, + Data: playerMessageS2CExecResultPayload{ + Score: nil, + Status: api.GamePlayerMessageS2CExecResultPayloadStatus("internal_error"), + }, + } + } + // TODO: broadcast to watchers + continue + } + for player := range hub.players { + if player.playerID != taskResult.TaskPayload.UserID() { + continue + } + player.s2cMessages <- &playerMessageS2CExecResult{ + Type: playerMessageTypeS2CExecResult, + Data: playerMessageS2CExecResultPayload{ + Score: nil, + Status: api.GamePlayerMessageS2CExecResultPayloadStatus(aggregatedStatus), + }, + } + // TODO: broadcast to watchers + } + default: + panic("unexpected task result type") + } + } +} + +func (hub *gameHub) processTaskResultCreateSubmissionRecord( + taskResult *taskqueue.TaskResultCreateSubmissionRecord, +) *codeSubmissionError { + if taskResult.Err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: taskResult.Err.Error(), + } + } + + if err := hub.taskQueue.EnqueueTaskCompileSwiftToWasm( + taskResult.TaskPayload.GameID(), + taskResult.TaskPayload.UserID(), + taskResult.TaskPayload.Code(), + taskResult.SubmissionID, + ); err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: err.Error(), + } + } + return nil +} + +func (hub *gameHub) processTaskResultCompileSwiftToWasm( + taskResult *taskqueue.TaskResultCompileSwiftToWasm, +) *codeSubmissionError { + if taskResult.Err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: taskResult.Err.Error(), + } + } + + if taskResult.Status != "success" { + if err := hub.q.CreateSubmissionResult(hub.ctx, db.CreateSubmissionResultParams{ + SubmissionID: int32(taskResult.TaskPayload.SubmissionID), + Status: taskResult.Status, + Stdout: taskResult.Stdout, + Stderr: taskResult.Stderr, + }); err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: err.Error(), + } + } + return &codeSubmissionError{ + Status: taskResult.Status, + Stdout: taskResult.Stdout, + Stderr: taskResult.Stderr, + } + } + if err := hub.taskQueue.EnqueueTaskCompileWasmToNativeExecutable( + taskResult.TaskPayload.GameID(), + taskResult.TaskPayload.UserID(), + taskResult.TaskPayload.Code(), + taskResult.TaskPayload.SubmissionID, + ); err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: err.Error(), + } + } + return nil +} + +func (hub *gameHub) processTaskResultCompileWasmToNativeExecutable( + taskResult *taskqueue.TaskResultCompileWasmToNativeExecutable, +) *codeSubmissionError { + if taskResult.Err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: taskResult.Err.Error(), + } + } + + if taskResult.Status != "success" { + if err := hub.q.CreateSubmissionResult(hub.ctx, db.CreateSubmissionResultParams{ + SubmissionID: int32(taskResult.TaskPayload.SubmissionID), + Status: taskResult.Status, + Stdout: taskResult.Stdout, + Stderr: taskResult.Stderr, + }); err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: err.Error(), + } + } + return &codeSubmissionError{ + Status: taskResult.Status, + Stdout: taskResult.Stdout, + Stderr: taskResult.Stderr, + } + } + + testcases, err := hub.q.ListTestcasesByGameID(hub.ctx, int32(taskResult.TaskPayload.GameID())) + if err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: err.Error(), + } + } + if len(testcases) == 0 { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: "no testcases found", + } + } + + for _, testcase := range testcases { + if err := hub.taskQueue.EnqueueTaskRunTestcase( + taskResult.TaskPayload.GameID(), + taskResult.TaskPayload.UserID(), + taskResult.TaskPayload.Code(), + taskResult.TaskPayload.SubmissionID, + int(testcase.TestcaseID), + testcase.Stdin, + testcase.Stdout, + ); err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: err.Error(), + } + } + } + return nil +} + +func (hub *gameHub) processTaskResultRunTestcase( + taskResult *taskqueue.TaskResultRunTestcase, +) *codeSubmissionError { + if taskResult.Err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: taskResult.Err.Error(), + } + } + + if taskResult.Status != "success" { + if err := hub.q.CreateTestcaseResult(hub.ctx, db.CreateTestcaseResultParams{ + SubmissionID: int32(taskResult.TaskPayload.SubmissionID), + TestcaseID: int32(taskResult.TaskPayload.TestcaseID), + Status: taskResult.Status, + Stdout: taskResult.Stdout, + Stderr: taskResult.Stderr, + }); err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: err.Error(), + } + } + return &codeSubmissionError{ + Status: taskResult.Status, + Stdout: taskResult.Stdout, + Stderr: taskResult.Stderr, + } + } + if !isTestcaseResultCorrect(taskResult.TaskPayload.Stdout, taskResult.Stdout) { + if err := hub.q.CreateTestcaseResult(hub.ctx, db.CreateTestcaseResultParams{ + SubmissionID: int32(taskResult.TaskPayload.SubmissionID), + TestcaseID: int32(taskResult.TaskPayload.TestcaseID), + Status: "wrong_answer", + Stdout: taskResult.Stdout, + Stderr: taskResult.Stderr, + }); err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: err.Error(), + } + } + return &codeSubmissionError{ + Status: "wrong_answer", + Stdout: taskResult.Stdout, + Stderr: taskResult.Stderr, + } + } + return nil +} + func (hub *gameHub) startGame() error { for player := range hub.players { player.s2cMessages <- &playerMessageS2CPrepare{ @@ -261,16 +545,18 @@ func (hub *gameHub) closeWatcherClient(watcher *watcherClient) { } type GameHubs struct { - hubs map[int]*gameHub - q *db.Queries - taskQueue *taskqueue.Queue + hubs map[int]*gameHub + q *db.Queries + taskQueue *taskqueue.Queue + taskResults chan taskqueue.TaskResult } -func NewGameHubs(q *db.Queries, taskQueue *taskqueue.Queue) *GameHubs { +func NewGameHubs(q *db.Queries, taskQueue *taskqueue.Queue, taskResults chan taskqueue.TaskResult) *GameHubs { return &GameHubs{ - hubs: make(map[int]*gameHub), - q: q, - taskQueue: taskQueue, + hubs: make(map[int]*gameHub), + q: q, + taskQueue: taskQueue, + taskResults: taskResults, } } @@ -327,6 +613,16 @@ func (hubs *GameHubs) RestoreFromDB(ctx context.Context) error { func (hubs *GameHubs) Run() { for _, hub := range hubs.hubs { go hub.run() + go hub.processTaskResults() + } + + for taskResult := range hubs.taskResults { + hub := hubs.getHub(taskResult.GameID()) + if hub == nil { + log.Printf("no such game: %d", taskResult.GameID()) + continue + } + hub.taskResults <- taskResult } } @@ -342,6 +638,8 @@ func (hubs *GameHubs) StartGame(gameID int) error { return hub.startGame() } -func (hubs *GameHubs) C() chan string { - return hubs.hubs[7].testcaseExecution +func isTestcaseResultCorrect(expectedStdout, actualStdout string) bool { + expectedStdout = strings.TrimSpace(expectedStdout) + actualStdout = strings.TrimSpace(actualStdout) + return actualStdout == expectedStdout } diff --git a/backend/gen/api_handler_wrapper_gen.go b/backend/gen/api/handler_wrapper_gen.go index 7fd34b2..7fd34b2 100644 --- a/backend/gen/api_handler_wrapper_gen.go +++ b/backend/gen/api/handler_wrapper_gen.go diff --git a/backend/gen/gen.go b/backend/gen/gen.go index 6fb430f..276bc05 100644 --- a/backend/gen/gen.go +++ b/backend/gen/gen.go @@ -1,5 +1,6 @@ -package main +package gen //go:generate go run github.com/sqlc-dev/sqlc/cmd/sqlc generate //go:generate go run github.com/oapi-codegen/oapi-codegen/v2/cmd/oapi-codegen -config oapi-codegen.yaml ../../openapi.yaml -//go:generate go run ./api_handler_wrapper_gen.go -i ../api/generated.go -o ../api/handler_wrapper.go +//go:generate go run ./api/handler_wrapper_gen.go -i ../api/generated.go -o ../api/handler_wrapper.go +//go:generate go run ./taskqueue/processor_wrapper_gen.go -i ../taskqueue/tasks.go -o ../taskqueue/processor_wrapper.go diff --git a/backend/gen/taskqueue/processor_wrapper_gen.go b/backend/gen/taskqueue/processor_wrapper_gen.go new file mode 100644 index 0000000..85960ae --- /dev/null +++ b/backend/gen/taskqueue/processor_wrapper_gen.go @@ -0,0 +1,132 @@ +package main + +import ( + "bytes" + "flag" + "go/ast" + "go/format" + "go/parser" + "go/token" + "os" + "slices" + "strings" + "text/template" +) + +func main() { + inputFile := flag.String("i", "", "input file") + outputFile := flag.String("o", "", "output file") + flag.Parse() + + if inputFile == nil || *inputFile == "" || outputFile == nil || *outputFile == "" { + flag.PrintDefaults() + os.Exit(1) + } + + // Parse the input file + fileSet := token.NewFileSet() + parsedFile, err := parser.ParseFile(fileSet, *inputFile, nil, parser.SkipObjectResolution) + if err != nil { + panic(err) + } + + // Find TaskType* constants. + var consts []string + for _, decl := range parsedFile.Decls { + genDecl, ok := decl.(*ast.GenDecl) + if !ok { + continue + } + for _, spec := range genDecl.Specs { + valueSpec, ok := spec.(*ast.ValueSpec) + if !ok { + continue + } + for _, name := range valueSpec.Names { + if !strings.HasPrefix(name.Name, "TaskType") { + continue + } + if name.Name == "TaskType" { + continue + } + consts = append(consts, strings.TrimPrefix(name.Name, "TaskType")) + } + } + } + if len(consts) == 0 { + panic("TaskType* constants not found") + } + slices.Sort(consts) + + // Generate code. + tmpl, err := template.New("code").Parse(templateText) + if err != nil { + panic(err) + } + + var buf bytes.Buffer + err = tmpl.Execute(&buf, consts) + if err != nil { + panic(err) + } + + formatted, err := format.Source(buf.Bytes()) + if err != nil { + panic(err) + } + + err = os.WriteFile(*outputFile, formatted, 0644) + if err != nil { + panic(err) + } +} + +const templateText = `// Code generated by go generate; DO NOT EDIT. + +package taskqueue + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/hibiken/asynq" +) + +type processorWrapper struct { + impl processor + results chan TaskResult +} + +func newProcessorWrapper(impl processor) *processorWrapper { + return &processorWrapper{ + impl: impl, + results: make(chan TaskResult), + } +} + +{{ range . }} + func (p *processorWrapper) processTask{{ . }}(ctx context.Context, t *asynq.Task) error { + var payload TaskPayload{{ . }} + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + p.results <- &TaskResult{{ . }}{Err: err} + return err + } + + result, err := p.impl.doProcessTask{{ . }}(ctx, &payload) + if err != nil { + retryCount, _ := asynq.GetRetryCount(ctx) + maxRetry, _ := asynq.GetMaxRetry(ctx) + isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry + if !isRecoverable { + p.results <- &TaskResult{{ . }}{Err: err} + } + return err + } + p.results <- result + return nil + } +{{ end }} +` diff --git a/backend/main.go b/backend/main.go index e3d0052..62499d6 100644 --- a/backend/main.go +++ b/backend/main.go @@ -61,8 +61,9 @@ func main() { e.Use(middleware.Recover()) taskQueue := taskqueue.NewQueue("task-db:6379") + workerServer := taskqueue.NewWorkerServer("task-db:6379", queries) - gameHubs := game.NewGameHubs(queries, taskQueue) + gameHubs := game.NewGameHubs(queries, taskQueue, workerServer.Results()) err = gameHubs.RestoreFromDB(ctx) if err != nil { log.Fatalf("Error restoring game hubs from db %v", err) @@ -95,9 +96,8 @@ func main() { return c.Redirect(http.StatusPermanentRedirect, "http://localhost:5173/logout") }) - gameHubs.Run() + go gameHubs.Run() - workerServer := taskqueue.NewWorkerServer("task-db:6379", queries, gameHubs.C()) go func() { workerServer.Run() }() diff --git a/backend/query.sql b/backend/query.sql index ea04c08..e767746 100644 --- a/backend/query.sql +++ b/backend/query.sql @@ -68,6 +68,24 @@ SELECT * FROM testcases WHERE testcases.problem_id = (SELECT problem_id FROM games WHERE game_id = $1) ORDER BY testcases.testcase_id; --- name: CreateTestcaseExecution :exec -INSERT INTO testcase_executions (submission_id, testcase_id, status, stdout, stderr) +-- name: CreateSubmissionResult :exec +INSERT INTO submission_results (submission_id, status, stdout, stderr) +VALUES ($1, $2, $3, $4); + +-- name: CreateTestcaseResult :exec +INSERT INTO testcase_results (submission_id, testcase_id, status, stdout, stderr) VALUES ($1, $2, $3, $4, $5); + +-- name: AggregateTestcaseResults :one +SELECT + CASE + WHEN COUNT(CASE WHEN r.status IS NULL THEN 1 END) > 0 THEN 'running' + WHEN COUNT(CASE WHEN r.status = 'internal_error' THEN 1 END) > 0 THEN 'internal_error' + WHEN COUNT(CASE WHEN r.status = 'timeout' THEN 1 END) > 0 THEN 'timeout' + WHEN COUNT(CASE WHEN r.status = 'runtime_error' THEN 1 END) > 0 THEN 'runtime_error' + WHEN COUNT(CASE WHEN r.status = 'wrong_answer' THEN 1 END) > 0 THEN 'wrong_answer' + ELSE 'success' + END AS status +FROM testcases +LEFT JOIN testcase_results AS r ON testcases.testcase_id = r.testcase_id +WHERE r.submission_id = $1; diff --git a/backend/schema.sql b/backend/schema.sql index d0b6c40..74d1202 100644 --- a/backend/schema.sql +++ b/backend/schema.sql @@ -64,14 +64,27 @@ CREATE TABLE submissions ( CONSTRAINT fk_user_id FOREIGN KEY(user_id) REFERENCES users(user_id) ); -CREATE TABLE testcase_executions ( - testcase_execution_id SERIAL PRIMARY KEY, - submission_id INT NOT NULL, - testcase_id INT, - status VARCHAR(16) NOT NULL, - stdout TEXT NOT NULL, - stderr TEXT NOT NULL, +CREATE TABLE submission_results ( + submission_result_id SERIAL PRIMARY KEY, + submission_id INT NOT NULL UNIQUE, + status VARCHAR(16) NOT NULL, + stdout TEXT NOT NULL, + stderr TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + CONSTRAINT fk_submission_id FOREIGN KEY(submission_id) REFERENCES submissions(submission_id) +); +CREATE INDEX idx_submission_results_submission_id ON submission_results(submission_id); + +CREATE TABLE testcase_results ( + testcase_result_id SERIAL PRIMARY KEY, + submission_id INT NOT NULL, + testcase_id INT NOT NULL, + status VARCHAR(16) NOT NULL, + stdout TEXT NOT NULL, + stderr TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), CONSTRAINT fk_submission_id FOREIGN KEY(submission_id) REFERENCES submissions(submission_id), - CONSTRAINT fk_testcase_id FOREIGN KEY(testcase_id) REFERENCES testcases(testcase_id) + CONSTRAINT fk_testcase_id FOREIGN KEY(testcase_id) REFERENCES testcases(testcase_id), + CONSTRAINT uq_submission_id_testcase_id UNIQUE(submission_id, testcase_id) ); -CREATE INDEX idx_testcase_executions_submission_id ON testcase_executions(submission_id); +CREATE INDEX idx_testcase_results_submission_id ON testcase_results(submission_id); diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go index 1d4c412..b64b01c 100644 --- a/backend/taskqueue/processor.go +++ b/backend/taskqueue/processor.go @@ -6,195 +6,150 @@ import ( "encoding/json" "fmt" "net/http" - "strings" - - "github.com/hibiken/asynq" "github.com/nsfisis/iosdc-japan-2024-albatross/backend/db" ) -type ExecProcessor struct { +type processor struct { q *db.Queries - c chan string } -func NewExecProcessor(q *db.Queries, c chan string) *ExecProcessor { - return &ExecProcessor{ +func newProcessor(q *db.Queries) processor { + return processor{ q: q, - c: c, } } -func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { - var payload TaskExecPlayload - if err := json.Unmarshal(t.Payload(), &payload); err != nil { - return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) - } - +func (p *processor) doProcessTaskCreateSubmissionRecord( + ctx context.Context, + payload *TaskPayloadCreateSubmissionRecord, +) (*TaskResultCreateSubmissionRecord, error) { // TODO: upsert - // Create submission record. submissionID, err := p.q.CreateSubmission(ctx, db.CreateSubmissionParams{ - GameID: int32(payload.GameID), - UserID: int32(payload.UserID), - Code: payload.Code, - CodeSize: int32(len(payload.Code)), // TODO: exclude whitespaces. + GameID: int32(payload.GameID()), + UserID: int32(payload.UserID()), + Code: payload.Code(), + CodeSize: int32(payload.CodeSize), }) if err != nil { - return fmt.Errorf("CreateSubmission failed: %v", err) + return nil, err } - { - type swiftcRequestData struct { - MaxDuration int `json:"max_duration_ms"` - Code string `json:"code"` - } - type swiftcResponseData struct { - Result string `json:"result"` - Stdout string `json:"stdout"` - Stderr string `json:"stderr"` - } - reqData := swiftcRequestData{ - MaxDuration: 5000, - Code: payload.Code, - } - reqJson, err := json.Marshal(reqData) - if err != nil { - return fmt.Errorf("json.Marshal failed: %v", err) - } - res, err := http.Post("http://worker:80/api/swiftc", "application/json", bytes.NewBuffer(reqJson)) - if err != nil { - return fmt.Errorf("http.Post failed: %v", err) - } - resData := swiftcResponseData{} - if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { - return fmt.Errorf("json.Decode failed: %v", err) - } - if resData.Result != "success" { - err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{ - SubmissionID: submissionID, - TestcaseID: nil, - Status: "compile_error", - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }) - if err != nil { - return fmt.Errorf("CreateTestcaseExecution failed: %v", err) - } - p.c <- "compile_error" - return fmt.Errorf("swiftc failed: %v", resData.Stderr) - } - } - { - type wasmcRequestData struct { - MaxDuration int `json:"max_duration_ms"` - Code string `json:"code"` - } - type wasmcResponseData struct { - Result string `json:"result"` - Stdout string `json:"stdout"` - Stderr string `json:"stderr"` - } - reqData := wasmcRequestData{ - MaxDuration: 5000, - Code: payload.Code, - } - reqJson, err := json.Marshal(reqData) - if err != nil { - return fmt.Errorf("json.Marshal failed: %v", err) - } - res, err := http.Post("http://worker:80/api/wasmc", "application/json", bytes.NewBuffer(reqJson)) - if err != nil { - return fmt.Errorf("http.Post failed: %v", err) - } - resData := wasmcResponseData{} - if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { - return fmt.Errorf("json.Decode failed: %v", err) - } - if resData.Result != "success" { - err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{ - SubmissionID: submissionID, - TestcaseID: nil, - Status: "compile_error", - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }) - if err != nil { - return fmt.Errorf("CreateTestcaseExecution failed: %v", err) - } - p.c <- "compile_error" - return fmt.Errorf("wasmc failed: %v", resData.Stderr) - } - } + return &TaskResultCreateSubmissionRecord{ + TaskPayload: payload, + SubmissionID: int(submissionID), + }, nil +} - testcases, err := p.q.ListTestcasesByGameID(ctx, int32(payload.GameID)) +func (p *processor) doProcessTaskCompileSwiftToWasm( + _ context.Context, + payload *TaskPayloadCompileSwiftToWasm, +) (*TaskResultCompileSwiftToWasm, error) { + type swiftcRequestData struct { + MaxDuration int `json:"max_duration_ms"` + Code string `json:"code"` + } + type swiftcResponseData struct { + Status string `json:"status"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + } + reqData := swiftcRequestData{ + MaxDuration: 5000, + Code: payload.Code(), + } + reqJson, err := json.Marshal(reqData) if err != nil { - return fmt.Errorf("ListTestcasesByGameID failed: %v", err) + return nil, fmt.Errorf("json.Marshal failed: %v", err) } - - for _, testcase := range testcases { - type testrunRequestData struct { - MaxDuration int `json:"max_duration_ms"` - Code string `json:"code"` - Stdin string `json:"stdin"` - } - type testrunResponseData struct { - Result string `json:"result"` - Stdout string `json:"stdout"` - Stderr string `json:"stderr"` - } - reqData := testrunRequestData{ - MaxDuration: 5000, - Code: payload.Code, - Stdin: testcase.Stdin, - } - reqJson, err := json.Marshal(reqData) - if err != nil { - return fmt.Errorf("json.Marshal failed: %v", err) - } - res, err := http.Post("http://worker:80/api/testrun", "application/json", bytes.NewBuffer(reqJson)) - if err != nil { - return fmt.Errorf("http.Post failed: %v", err) - } - resData := testrunResponseData{} - if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { - return fmt.Errorf("json.Decode failed: %v", err) - } - if resData.Result != "success" { - err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{ - SubmissionID: submissionID, - TestcaseID: &testcase.TestcaseID, - Status: resData.Result, - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }) - if err != nil { - return fmt.Errorf("CreateTestcaseExecution failed: %v", err) - } - p.c <- resData.Result - return fmt.Errorf("testrun failed: %v", resData.Stderr) - } - if !isTestcaseExecutionCorrect(testcase.Stdout, resData.Stdout) { - err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{ - SubmissionID: submissionID, - TestcaseID: &testcase.TestcaseID, - Status: "wrong_answer", - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }) - if err != nil { - return fmt.Errorf("CreateTestcaseExecution failed: %v", err) - } - p.c <- "wrong_answer" - return fmt.Errorf("testrun failed: %v", resData.Stdout) - } + res, err := http.Post("http://worker:80/api/swiftc", "application/json", bytes.NewBuffer(reqJson)) + if err != nil { + return nil, fmt.Errorf("http.Post failed: %v", err) + } + resData := swiftcResponseData{} + if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { + return nil, fmt.Errorf("json.Decode failed: %v", err) } + return &TaskResultCompileSwiftToWasm{ + TaskPayload: payload, + Status: resData.Status, + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }, nil +} - p.c <- "success" - return nil +func (p *processor) doProcessTaskCompileWasmToNativeExecutable( + _ context.Context, + payload *TaskPayloadCompileWasmToNativeExecutable, +) (*TaskResultCompileWasmToNativeExecutable, error) { + type wasmcRequestData struct { + MaxDuration int `json:"max_duration_ms"` + Code string `json:"code"` + } + type wasmcResponseData struct { + Status string `json:"status"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + } + reqData := wasmcRequestData{ + MaxDuration: 5000, + Code: payload.Code(), + } + reqJson, err := json.Marshal(reqData) + if err != nil { + return nil, fmt.Errorf("json.Marshal failed: %v", err) + } + res, err := http.Post("http://worker:80/api/wasmc", "application/json", bytes.NewBuffer(reqJson)) + if err != nil { + return nil, fmt.Errorf("http.Post failed: %v", err) + } + resData := wasmcResponseData{} + if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { + return nil, fmt.Errorf("json.Decode failed: %v", err) + } + return &TaskResultCompileWasmToNativeExecutable{ + TaskPayload: payload, + Status: resData.Status, + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }, nil } -func isTestcaseExecutionCorrect(expectedStdout, actualStdout string) bool { - expectedStdout = strings.TrimSpace(expectedStdout) - actualStdout = strings.TrimSpace(actualStdout) - return actualStdout == expectedStdout +func (p *processor) doProcessTaskRunTestcase( + _ context.Context, + payload *TaskPayloadRunTestcase, +) (*TaskResultRunTestcase, error) { + type testrunRequestData struct { + MaxDuration int `json:"max_duration_ms"` + Code string `json:"code"` + Stdin string `json:"stdin"` + } + type testrunResponseData struct { + Status string `json:"status"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + } + reqData := testrunRequestData{ + MaxDuration: 5000, + Code: payload.Code(), + Stdin: payload.Stdin, + } + reqJson, err := json.Marshal(reqData) + if err != nil { + return nil, fmt.Errorf("json.Marshal failed: %v", err) + } + res, err := http.Post("http://worker:80/api/testrun", "application/json", bytes.NewBuffer(reqJson)) + if err != nil { + return nil, fmt.Errorf("http.Post failed: %v", err) + } + resData := testrunResponseData{} + if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { + return nil, fmt.Errorf("json.Decode failed: %v", err) + } + return &TaskResultRunTestcase{ + TaskPayload: payload, + Status: resData.Status, + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }, nil } diff --git a/backend/taskqueue/processor_wrapper.go b/backend/taskqueue/processor_wrapper.go new file mode 100644 index 0000000..b1fbd16 --- /dev/null +++ b/backend/taskqueue/processor_wrapper.go @@ -0,0 +1,112 @@ +// Code generated by go generate; DO NOT EDIT. + +package taskqueue + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/hibiken/asynq" +) + +type processorWrapper struct { + impl processor + results chan TaskResult +} + +func newProcessorWrapper(impl processor) *processorWrapper { + return &processorWrapper{ + impl: impl, + results: make(chan TaskResult), + } +} + +func (p *processorWrapper) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadCompileSwiftToWasm + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + p.results <- &TaskResultCompileSwiftToWasm{Err: err} + return err + } + + result, err := p.impl.doProcessTaskCompileSwiftToWasm(ctx, &payload) + if err != nil { + retryCount, _ := asynq.GetRetryCount(ctx) + maxRetry, _ := asynq.GetMaxRetry(ctx) + isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry + if !isRecoverable { + p.results <- &TaskResultCompileSwiftToWasm{Err: err} + } + return err + } + p.results <- result + return nil +} + +func (p *processorWrapper) processTaskCompileWasmToNativeExecutable(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadCompileWasmToNativeExecutable + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + p.results <- &TaskResultCompileWasmToNativeExecutable{Err: err} + return err + } + + result, err := p.impl.doProcessTaskCompileWasmToNativeExecutable(ctx, &payload) + if err != nil { + retryCount, _ := asynq.GetRetryCount(ctx) + maxRetry, _ := asynq.GetMaxRetry(ctx) + isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry + if !isRecoverable { + p.results <- &TaskResultCompileWasmToNativeExecutable{Err: err} + } + return err + } + p.results <- result + return nil +} + +func (p *processorWrapper) processTaskCreateSubmissionRecord(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadCreateSubmissionRecord + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + p.results <- &TaskResultCreateSubmissionRecord{Err: err} + return err + } + + result, err := p.impl.doProcessTaskCreateSubmissionRecord(ctx, &payload) + if err != nil { + retryCount, _ := asynq.GetRetryCount(ctx) + maxRetry, _ := asynq.GetMaxRetry(ctx) + isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry + if !isRecoverable { + p.results <- &TaskResultCreateSubmissionRecord{Err: err} + } + return err + } + p.results <- result + return nil +} + +func (p *processorWrapper) processTaskRunTestcase(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadRunTestcase + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + p.results <- &TaskResultRunTestcase{Err: err} + return err + } + + result, err := p.impl.doProcessTaskRunTestcase(ctx, &payload) + if err != nil { + retryCount, _ := asynq.GetRetryCount(ctx) + maxRetry, _ := asynq.GetMaxRetry(ctx) + isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry + if !isRecoverable { + p.results <- &TaskResultRunTestcase{Err: err} + } + return err + } + p.results <- result + return nil +} diff --git a/backend/taskqueue/queue.go b/backend/taskqueue/queue.go index 53ec6d6..515a406 100644 --- a/backend/taskqueue/queue.go +++ b/backend/taskqueue/queue.go @@ -20,6 +20,84 @@ func (q *Queue) Close() { q.client.Close() } -func (q *Queue) Enqueue(task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error) { - return q.client.Enqueue(task, opts...) +func (q *Queue) EnqueueTaskCreateSubmissionRecord( + gameID int, + userID int, + code string, + codeSize int, +) error { + task, err := newTaskCreateSubmissionRecord( + gameID, + userID, + code, + codeSize, + ) + if err != nil { + return err + } + _, err = q.client.Enqueue(task) + return err +} + +func (q *Queue) EnqueueTaskCompileSwiftToWasm( + gameID int, + userID int, + code string, + submissionID int, +) error { + task, err := newTaskCompileSwiftToWasm( + gameID, + userID, + code, + submissionID, + ) + if err != nil { + return err + } + _, err = q.client.Enqueue(task) + return err +} + +func (q *Queue) EnqueueTaskCompileWasmToNativeExecutable( + gameID int, + userID int, + code string, + submissionID int, +) error { + task, err := newTaskCompileWasmToNativeExecutable( + gameID, + userID, + code, + submissionID, + ) + if err != nil { + return err + } + _, err = q.client.Enqueue(task) + return err +} + +func (q *Queue) EnqueueTaskRunTestcase( + gameID int, + userID int, + code string, + submissionID int, + testcaseID int, + stdin string, + stdout string, +) error { + task, err := newTaskRunTestcase( + gameID, + userID, + code, + submissionID, + testcaseID, + stdin, + stdout, + ) + if err != nil { + return err + } + _, err = q.client.Enqueue(task) + return err } diff --git a/backend/taskqueue/tasks.go b/backend/taskqueue/tasks.go index cd67948..cbe83b1 100644 --- a/backend/taskqueue/tasks.go +++ b/backend/taskqueue/tasks.go @@ -6,24 +6,191 @@ import ( "github.com/hibiken/asynq" ) +type TaskType string + const ( - TaskTypeExec = "exec" + TaskTypeCreateSubmissionRecord TaskType = "create_submission_record" + TaskTypeCompileSwiftToWasm TaskType = "compile_swift_to_wasm" + TaskTypeCompileWasmToNativeExecutable TaskType = "compile_wasm_to_native_executable" + TaskTypeRunTestcase TaskType = "run_testcase" ) -type TaskExecPlayload struct { +type TaskPayloadBase struct { GameID int UserID int Code string } -func NewExecTask(gameID, userID int, code string) (*asynq.Task, error) { - payload, err := json.Marshal(TaskExecPlayload{ - GameID: gameID, - UserID: userID, - Code: code, +type TaskPayloadCreateSubmissionRecord struct { + TaskPayloadBase + CodeSize int +} + +func newTaskCreateSubmissionRecord( + gameID int, + userID int, + code string, + codeSize int, +) (*asynq.Task, error) { + payload, err := json.Marshal(TaskPayloadCreateSubmissionRecord{ + TaskPayloadBase: TaskPayloadBase{ + GameID: gameID, + UserID: userID, + Code: code, + }, + CodeSize: codeSize, + }) + if err != nil { + return nil, err + } + return asynq.NewTask(string(TaskTypeCreateSubmissionRecord), payload), nil +} + +func (t *TaskPayloadCreateSubmissionRecord) GameID() int { return t.TaskPayloadBase.GameID } +func (t *TaskPayloadCreateSubmissionRecord) UserID() int { return t.TaskPayloadBase.UserID } +func (t *TaskPayloadCreateSubmissionRecord) Code() string { return t.TaskPayloadBase.Code } + +type TaskPayloadCompileSwiftToWasm struct { + TaskPayloadBase + SubmissionID int +} + +func newTaskCompileSwiftToWasm( + gameID int, + userID int, + code string, + submissionID int, +) (*asynq.Task, error) { + payload, err := json.Marshal(TaskPayloadCompileSwiftToWasm{ + TaskPayloadBase: TaskPayloadBase{ + GameID: gameID, + UserID: userID, + Code: code, + }, + SubmissionID: submissionID, + }) + if err != nil { + return nil, err + } + return asynq.NewTask(string(TaskTypeCompileSwiftToWasm), payload), nil +} + +func (t *TaskPayloadCompileSwiftToWasm) GameID() int { return t.TaskPayloadBase.GameID } +func (t *TaskPayloadCompileSwiftToWasm) UserID() int { return t.TaskPayloadBase.UserID } +func (t *TaskPayloadCompileSwiftToWasm) Code() string { return t.TaskPayloadBase.Code } + +type TaskPayloadCompileWasmToNativeExecutable struct { + TaskPayloadBase + SubmissionID int +} + +func newTaskCompileWasmToNativeExecutable( + gameID int, + userID int, + code string, + submissionID int, +) (*asynq.Task, error) { + payload, err := json.Marshal(TaskPayloadCompileWasmToNativeExecutable{ + TaskPayloadBase: TaskPayloadBase{ + GameID: gameID, + UserID: userID, + Code: code, + }, + SubmissionID: submissionID, + }) + if err != nil { + return nil, err + } + return asynq.NewTask(string(TaskTypeCompileWasmToNativeExecutable), payload), nil +} + +func (t *TaskPayloadCompileWasmToNativeExecutable) GameID() int { return t.TaskPayloadBase.GameID } +func (t *TaskPayloadCompileWasmToNativeExecutable) UserID() int { return t.TaskPayloadBase.UserID } +func (t *TaskPayloadCompileWasmToNativeExecutable) Code() string { return t.TaskPayloadBase.Code } + +type TaskPayloadRunTestcase struct { + TaskPayloadBase + SubmissionID int + TestcaseID int + Stdin string + Stdout string +} + +func newTaskRunTestcase( + gameID int, + userID int, + code string, + submissionID int, + testcaseID int, + stdin string, + stdout string, +) (*asynq.Task, error) { + payload, err := json.Marshal(TaskPayloadRunTestcase{ + TaskPayloadBase: TaskPayloadBase{ + GameID: gameID, + UserID: userID, + Code: code, + }, + SubmissionID: submissionID, + TestcaseID: testcaseID, + Stdin: stdin, + Stdout: stdout, }) if err != nil { return nil, err } - return asynq.NewTask(TaskTypeExec, payload), nil + return asynq.NewTask(string(TaskTypeRunTestcase), payload), nil +} + +func (t *TaskPayloadRunTestcase) GameID() int { return t.TaskPayloadBase.GameID } +func (t *TaskPayloadRunTestcase) UserID() int { return t.TaskPayloadBase.UserID } +func (t *TaskPayloadRunTestcase) Code() string { return t.TaskPayloadBase.Code } + +type TaskResult interface { + Type() TaskType + GameID() int +} + +type TaskResultCreateSubmissionRecord struct { + TaskPayload *TaskPayloadCreateSubmissionRecord + SubmissionID int + Err error } + +func (r *TaskResultCreateSubmissionRecord) Type() TaskType { return TaskTypeCreateSubmissionRecord } +func (r *TaskResultCreateSubmissionRecord) GameID() int { return r.TaskPayload.GameID() } + +type TaskResultCompileSwiftToWasm struct { + TaskPayload *TaskPayloadCompileSwiftToWasm + Status string + Stdout string + Stderr string + Err error +} + +func (r *TaskResultCompileSwiftToWasm) Type() TaskType { return TaskTypeCompileSwiftToWasm } +func (r *TaskResultCompileSwiftToWasm) GameID() int { return r.TaskPayload.GameID() } + +type TaskResultCompileWasmToNativeExecutable struct { + TaskPayload *TaskPayloadCompileWasmToNativeExecutable + Status string + Stdout string + Stderr string + Err error +} + +func (r *TaskResultCompileWasmToNativeExecutable) Type() TaskType { + return TaskTypeCompileWasmToNativeExecutable +} +func (r *TaskResultCompileWasmToNativeExecutable) GameID() int { return r.TaskPayload.GameID() } + +type TaskResultRunTestcase struct { + TaskPayload *TaskPayloadRunTestcase + Status string + Stdout string + Stderr string + Err error +} + +func (r *TaskResultRunTestcase) Type() TaskType { return TaskTypeRunTestcase } +func (r *TaskResultRunTestcase) GameID() int { return r.TaskPayload.GameID() } diff --git a/backend/taskqueue/worker_server.go b/backend/taskqueue/worker_server.go index 485d6d3..317f61c 100644 --- a/backend/taskqueue/worker_server.go +++ b/backend/taskqueue/worker_server.go @@ -7,27 +7,35 @@ import ( ) type WorkerServer struct { - server *asynq.Server - queries *db.Queries - c chan string + server *asynq.Server + processor *processorWrapper } -func NewWorkerServer(redisAddr string, queries *db.Queries, c chan string) *WorkerServer { +func NewWorkerServer(redisAddr string, queries *db.Queries) *WorkerServer { + server := asynq.NewServer( + asynq.RedisClientOpt{ + Addr: redisAddr, + }, + asynq.Config{}, + ) + processor := newProcessorWrapper(newProcessor(queries)) return &WorkerServer{ - server: asynq.NewServer( - asynq.RedisClientOpt{ - Addr: redisAddr, - }, - asynq.Config{}, - ), - queries: queries, - c: c, + server: server, + processor: processor, } } func (s *WorkerServer) Run() error { mux := asynq.NewServeMux() - mux.Handle(TaskTypeExec, NewExecProcessor(s.queries, s.c)) + + mux.HandleFunc(string(TaskTypeCreateSubmissionRecord), s.processor.processTaskCreateSubmissionRecord) + mux.HandleFunc(string(TaskTypeCompileSwiftToWasm), s.processor.processTaskCompileSwiftToWasm) + mux.HandleFunc(string(TaskTypeCompileWasmToNativeExecutable), s.processor.processTaskCompileWasmToNativeExecutable) + mux.HandleFunc(string(TaskTypeRunTestcase), s.processor.processTaskRunTestcase) return s.server.Run(mux) } + +func (s *WorkerServer) Results() chan TaskResult { + return s.processor.results +} diff --git a/worker/exec.go b/worker/exec.go index 2ef16fa..fb238c3 100644 --- a/worker/exec.go +++ b/worker/exec.go @@ -77,7 +77,7 @@ func convertCommandErrorToResultType(err error) string { if err == context.DeadlineExceeded { return resultTimeout } else { - return resultFailure + return resultRuntimeError } } else { return resultSuccess @@ -95,7 +95,7 @@ func execSwiftCompile( if err := os.WriteFile(inPath, []byte(code), 0644); err != nil { return swiftCompileResponseData{ - Result: resultInternalError, + Status: resultInternalError, Stdout: "", Stderr: err.Error(), } @@ -116,7 +116,7 @@ func execSwiftCompile( ) return swiftCompileResponseData{ - Result: convertCommandErrorToResultType(err), + Status: convertCommandErrorToResultType(err), Stdout: stdout, Stderr: stderr, } @@ -148,7 +148,7 @@ func execWasmCompile( ) return wasmCompileResponseData{ - Result: convertCommandErrorToResultType(err), + Status: convertCommandErrorToResultType(err), Stdout: stdout, Stderr: stderr, } @@ -179,7 +179,7 @@ func execTestRun( ) return testRunResponseData{ - Result: convertCommandErrorToResultType(err), + Status: convertCommandErrorToResultType(err), Stdout: stdout, Stderr: stderr, } diff --git a/worker/models.go b/worker/models.go index b838fe0..c60002c 100644 --- a/worker/models.go +++ b/worker/models.go @@ -7,7 +7,7 @@ import ( const ( resultSuccess = "success" - resultFailure = "failure" + resultRuntimeError = "runtime_error" resultTimeout = "timeout" resultInternalError = "internal_error" ) @@ -33,7 +33,7 @@ func (req *swiftCompileRequestData) validate() error { } type swiftCompileResponseData struct { - Result string `json:"result"` + Status string `json:"status"` Stdout string `json:"stdout"` Stderr string `json:"stderr"` } @@ -44,7 +44,7 @@ type wasmCompileRequestData struct { } type wasmCompileResponseData struct { - Result string `json:"result"` + Status string `json:"status"` Stdout string `json:"stdout"` Stderr string `json:"stderr"` } @@ -78,7 +78,7 @@ func (req *testRunRequestData) validate() error { } type testRunResponseData struct { - Result string `json:"result"` + Status string `json:"status"` Stdout string `json:"stdout"` Stderr string `json:"stderr"` } |
