diff options
| author | nsfisis <nsfisis@gmail.com> | 2024-08-05 05:09:41 +0900 |
|---|---|---|
| committer | nsfisis <nsfisis@gmail.com> | 2024-08-05 05:14:30 +0900 |
| commit | 2fc239b3f4d49f1a257523df7c7781a2141252bf (patch) | |
| tree | 5b5c826e39dbc560bea29461e65a1f712382e19d /backend | |
| parent | 1325626397703567828a903a82fc1ed8626dbad4 (diff) | |
| download | phperkaigi-2025-albatross-2fc239b3f4d49f1a257523df7c7781a2141252bf.tar.gz phperkaigi-2025-albatross-2fc239b3f4d49f1a257523df7c7781a2141252bf.tar.zst phperkaigi-2025-albatross-2fc239b3f4d49f1a257523df7c7781a2141252bf.zip | |
feat(backend): implement task queue processor
Diffstat (limited to 'backend')
| -rw-r--r-- | backend/db/query.sql.go | 79 | ||||
| -rw-r--r-- | backend/main.go | 2 | ||||
| -rw-r--r-- | backend/query.sql | 13 | ||||
| -rw-r--r-- | backend/taskqueue/processor.go | 176 | ||||
| -rw-r--r-- | backend/taskqueue/worker_server.go | 9 |
5 files changed, 271 insertions, 8 deletions
diff --git a/backend/db/query.sql.go b/backend/db/query.sql.go index 8df3bf5..89506d0 100644 --- a/backend/db/query.sql.go +++ b/backend/db/query.sql.go @@ -11,6 +11,55 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) +const createSubmission = `-- name: CreateSubmission :one +INSERT INTO submissions (game_id, user_id, code, code_size) +VALUES ($1, $2, $3, $4) +RETURNING submission_id +` + +type CreateSubmissionParams struct { + GameID int32 + UserID int32 + Code string + CodeSize int32 +} + +func (q *Queries) CreateSubmission(ctx context.Context, arg CreateSubmissionParams) (int32, error) { + row := q.db.QueryRow(ctx, createSubmission, + arg.GameID, + arg.UserID, + arg.Code, + arg.CodeSize, + ) + var submission_id int32 + err := row.Scan(&submission_id) + return submission_id, err +} + +const createTestcaseExecution = `-- name: CreateTestcaseExecution :exec +INSERT INTO testcase_executions (submission_id, testcase_id, status, stdout, stderr) +VALUES ($1, $2, $3, $4, $5) +` + +type CreateTestcaseExecutionParams struct { + SubmissionID int32 + TestcaseID *int32 + Status string + Stdout string + Stderr string +} + +func (q *Queries) CreateTestcaseExecution(ctx context.Context, arg CreateTestcaseExecutionParams) error { + _, err := q.db.Exec(ctx, createTestcaseExecution, + arg.SubmissionID, + arg.TestcaseID, + arg.Status, + arg.Stdout, + arg.Stderr, + ) + return err +} + const getGameByID = `-- name: GetGameByID :one SELECT game_id, game_type, state, display_name, duration_seconds, created_at, started_at, games.problem_id, problems.problem_id, title, description FROM games LEFT JOIN problems ON games.problem_id = problems.problem_id @@ -263,6 +312,36 @@ func (q *Queries) ListGamesForPlayer(ctx context.Context, userID int32) ([]ListG return items, nil } +const listTestcasesByGameID = `-- name: ListTestcasesByGameID :many +SELECT testcase_id, problem_id, stdin, stdout FROM testcases +WHERE testcases.problem_id = (SELECT problem_id FROM games WHERE game_id = $1) +` + +func (q *Queries) ListTestcasesByGameID(ctx context.Context, gameID int32) ([]Testcase, error) { + rows, err := q.db.Query(ctx, listTestcasesByGameID, gameID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Testcase + for rows.Next() { + var i Testcase + if err := rows.Scan( + &i.TestcaseID, + &i.ProblemID, + &i.Stdin, + &i.Stdout, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const listUsers = `-- name: ListUsers :many SELECT user_id, username, display_name, icon_path, is_admin, created_at FROM users ` diff --git a/backend/main.go b/backend/main.go index a213c6c..7cfbe2f 100644 --- a/backend/main.go +++ b/backend/main.go @@ -61,7 +61,7 @@ func main() { e.Use(middleware.Recover()) taskQueue := taskqueue.NewQueue("task-db:6379") - workerServer := taskqueue.NewWorkerServer("task-db:6379") + workerServer := taskqueue.NewWorkerServer("task-db:6379", queries) gameHubs := game.NewGameHubs(queries, taskQueue) err = gameHubs.RestoreFromDB(ctx) diff --git a/backend/query.sql b/backend/query.sql index 245d5cf..6395b9b 100644 --- a/backend/query.sql +++ b/backend/query.sql @@ -53,3 +53,16 @@ SET started_at = $6, problem_id = $7 WHERE game_id = $1; + +-- name: CreateSubmission :one +INSERT INTO submissions (game_id, user_id, code, code_size) +VALUES ($1, $2, $3, $4) +RETURNING submission_id; + +-- name: ListTestcasesByGameID :many +SELECT * FROM testcases +WHERE testcases.problem_id = (SELECT problem_id FROM games WHERE game_id = $1); + +-- name: CreateTestcaseExecution :exec +INSERT INTO testcase_executions (submission_id, testcase_id, status, stdout, stderr) +VALUES ($1, $2, $3, $4, $5); diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go index e26ac64..1105da5 100644 --- a/backend/taskqueue/processor.go +++ b/backend/taskqueue/processor.go @@ -1,25 +1,193 @@ package taskqueue import ( + "bytes" "context" "encoding/json" "fmt" + "net/http" + "strings" "github.com/hibiken/asynq" + + "github.com/nsfisis/iosdc-japan-2024-albatross/backend/db" ) type ExecProcessor struct { + q *db.Queries +} + +func NewExecProcessor(q *db.Queries) *ExecProcessor { + return &ExecProcessor{ + q: q, + } } -func (processor *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { +func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { var payload TaskExecPlayload if err := json.Unmarshal(t.Payload(), &payload); err != nil { return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } - // TODO + + // TODO: upsert + // Create submission record. + submissionID, err := p.q.CreateSubmission(ctx, db.CreateSubmissionParams{ + GameID: int32(payload.GameID), + UserID: int32(payload.UserID), + Code: payload.Code, + CodeSize: int32(len(payload.Code)), // TODO: exclude whitespaces. + }) + if err != nil { + return fmt.Errorf("CreateSubmission failed: %v", err) + } + + { + type swiftcRequestData struct { + MaxDuration int `json:"max_duration_ms"` + Code string `json:"code"` + } + type swiftcResponseData struct { + Result string `json:"result"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + } + reqData := swiftcRequestData{ + MaxDuration: 5000, + Code: payload.Code, + } + reqJson, err := json.Marshal(reqData) + if err != nil { + return fmt.Errorf("json.Marshal failed: %v", err) + } + res, err := http.Post("http://worker:80/api/swiftc", "application/json", bytes.NewBuffer(reqJson)) + if err != nil { + return fmt.Errorf("http.Post failed: %v", err) + } + resData := swiftcResponseData{} + if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { + return fmt.Errorf("json.Decode failed: %v", err) + } + if resData.Result != "success" { + err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{ + SubmissionID: submissionID, + TestcaseID: nil, + Status: "compile_error", + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }) + if err != nil { + return fmt.Errorf("CreateTestcaseExecution failed: %v", err) + } + return fmt.Errorf("swiftc failed: %v", resData.Stderr) + } + } + { + type wasmcRequestData struct { + MaxDuration int `json:"max_duration_ms"` + Code string `json:"code"` + } + type wasmcResponseData struct { + Result string `json:"result"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + } + reqData := wasmcRequestData{ + MaxDuration: 5000, + Code: payload.Code, + } + reqJson, err := json.Marshal(reqData) + if err != nil { + return fmt.Errorf("json.Marshal failed: %v", err) + } + res, err := http.Post("http://worker:80/api/wasmc", "application/json", bytes.NewBuffer(reqJson)) + if err != nil { + return fmt.Errorf("http.Post failed: %v", err) + } + resData := wasmcResponseData{} + if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { + return fmt.Errorf("json.Decode failed: %v", err) + } + if resData.Result != "success" { + err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{ + SubmissionID: submissionID, + TestcaseID: nil, + Status: "compile_error", + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }) + if err != nil { + return fmt.Errorf("CreateTestcaseExecution failed: %v", err) + } + return fmt.Errorf("wasmc failed: %v", resData.Stderr) + } + } + + testcases, err := p.q.ListTestcasesByGameID(ctx, int32(payload.GameID)) + if err != nil { + return fmt.Errorf("ListTestcasesByGameID failed: %v", err) + } + + for _, testcase := range testcases { + type testrunRequestData struct { + MaxDuration int `json:"max_duration_ms"` + Code string `json:"code"` + Stdin string `json:"stdin"` + } + type testrunResponseData struct { + Result string `json:"result"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + } + reqData := testrunRequestData{ + MaxDuration: 5000, + Code: payload.Code, + Stdin: testcase.Stdin, + } + reqJson, err := json.Marshal(reqData) + if err != nil { + return fmt.Errorf("json.Marshal failed: %v", err) + } + res, err := http.Post("http://worker:80/api/testrun", "application/json", bytes.NewBuffer(reqJson)) + if err != nil { + return fmt.Errorf("http.Post failed: %v", err) + } + resData := testrunResponseData{} + if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { + return fmt.Errorf("json.Decode failed: %v", err) + } + if resData.Result != "success" { + err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{ + SubmissionID: submissionID, + TestcaseID: &testcase.TestcaseID, + Status: resData.Result, + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }) + if err != nil { + return fmt.Errorf("CreateTestcaseExecution failed: %v", err) + } + return fmt.Errorf("testrun failed: %v", resData.Stderr) + } + if isTestcaseExecutionCorrect(testcase.Stdout, resData.Stdout) { + err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{ + SubmissionID: submissionID, + TestcaseID: &testcase.TestcaseID, + Status: "wrong_answer", + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }) + if err != nil { + return fmt.Errorf("CreateTestcaseExecution failed: %v", err) + } + return fmt.Errorf("testrun failed: %v", resData.Stdout) + } + } + return nil } -func NewExecProcessor() *ExecProcessor { - return &ExecProcessor{} +func isTestcaseExecutionCorrect(expectedStdout, actualStdout string) bool { + expectedStdout = strings.TrimSpace(expectedStdout) + actualStdout = strings.TrimSpace(actualStdout) + return actualStdout == expectedStdout } diff --git a/backend/taskqueue/worker_server.go b/backend/taskqueue/worker_server.go index 9bdd81f..09d9761 100644 --- a/backend/taskqueue/worker_server.go +++ b/backend/taskqueue/worker_server.go @@ -2,13 +2,16 @@ package taskqueue import ( "github.com/hibiken/asynq" + + "github.com/nsfisis/iosdc-japan-2024-albatross/backend/db" ) type WorkerServer struct { - server *asynq.Server + server *asynq.Server + queries *db.Queries } -func NewWorkerServer(redisAddr string) *WorkerServer { +func NewWorkerServer(redisAddr string, queries *db.Queries) *WorkerServer { return &WorkerServer{ server: asynq.NewServer( asynq.RedisClientOpt{ @@ -21,7 +24,7 @@ func NewWorkerServer(redisAddr string) *WorkerServer { func (s *WorkerServer) Run() error { mux := asynq.NewServeMux() - mux.Handle(TaskTypeExec, NewExecProcessor()) + mux.Handle(TaskTypeExec, NewExecProcessor(s.queries)) return s.server.Run(mux) } |
