aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authornsfisis <nsfisis@gmail.com>2024-08-08 19:30:20 +0900
committernsfisis <nsfisis@gmail.com>2024-08-08 19:30:20 +0900
commit9e5500269746d3826382a6dec78f0e82cfda0d42 (patch)
treed8f15d4081132d6cfff3ea3b02f197b2d6bad700
parent401a28944fc0408811aedadd1c3104e2e2d4d7fe (diff)
parent113c83b19acc58fbd46e8acdac67ff1a112d0d8c (diff)
downloadphperkaigi-2025-albatross-9e5500269746d3826382a6dec78f0e82cfda0d42.tar.gz
phperkaigi-2025-albatross-9e5500269746d3826382a6dec78f0e82cfda0d42.tar.zst
phperkaigi-2025-albatross-9e5500269746d3826382a6dec78f0e82cfda0d42.zip
Merge branch 'feat/taskqueue'
-rw-r--r--backend/db/models.go24
-rw-r--r--backend/db/query.sql.go56
-rw-r--r--backend/fixtures/dev.sql1
-rw-r--r--backend/game/hub.go352
-rw-r--r--backend/gen/api/handler_wrapper_gen.go (renamed from backend/gen/api_handler_wrapper_gen.go)0
-rw-r--r--backend/gen/gen.go5
-rw-r--r--backend/gen/taskqueue/processor_wrapper_gen.go132
-rw-r--r--backend/main.go6
-rw-r--r--backend/query.sql22
-rw-r--r--backend/schema.sql31
-rw-r--r--backend/taskqueue/processor.go289
-rw-r--r--backend/taskqueue/processor_wrapper.go112
-rw-r--r--backend/taskqueue/queue.go82
-rw-r--r--backend/taskqueue/tasks.go183
-rw-r--r--backend/taskqueue/worker_server.go34
-rw-r--r--worker/exec.go10
-rw-r--r--worker/models.go8
17 files changed, 1092 insertions, 255 deletions
diff --git a/backend/db/models.go b/backend/db/models.go
index 431d860..d4cf98b 100644
--- a/backend/db/models.go
+++ b/backend/db/models.go
@@ -39,6 +39,15 @@ type Submission struct {
CreatedAt pgtype.Timestamp
}
+type SubmissionResult struct {
+ SubmissionResultID int32
+ SubmissionID int32
+ Status string
+ Stdout string
+ Stderr string
+ CreatedAt pgtype.Timestamp
+}
+
type Testcase struct {
TestcaseID int32
ProblemID int32
@@ -46,13 +55,14 @@ type Testcase struct {
Stdout string
}
-type TestcaseExecution struct {
- TestcaseExecutionID int32
- SubmissionID int32
- TestcaseID *int32
- Status string
- Stdout string
- Stderr string
+type TestcaseResult struct {
+ TestcaseResultID int32
+ SubmissionID int32
+ TestcaseID int32
+ Status string
+ Stdout string
+ Stderr string
+ CreatedAt pgtype.Timestamp
}
type User struct {
diff --git a/backend/db/query.sql.go b/backend/db/query.sql.go
index 18acfda..5475067 100644
--- a/backend/db/query.sql.go
+++ b/backend/db/query.sql.go
@@ -11,6 +11,28 @@ import (
"github.com/jackc/pgx/v5/pgtype"
)
+const aggregateTestcaseResults = `-- name: AggregateTestcaseResults :one
+SELECT
+ CASE
+ WHEN COUNT(CASE WHEN r.status IS NULL THEN 1 END) > 0 THEN 'running'
+ WHEN COUNT(CASE WHEN r.status = 'internal_error' THEN 1 END) > 0 THEN 'internal_error'
+ WHEN COUNT(CASE WHEN r.status = 'timeout' THEN 1 END) > 0 THEN 'timeout'
+ WHEN COUNT(CASE WHEN r.status = 'runtime_error' THEN 1 END) > 0 THEN 'runtime_error'
+ WHEN COUNT(CASE WHEN r.status = 'wrong_answer' THEN 1 END) > 0 THEN 'wrong_answer'
+ ELSE 'success'
+ END AS status
+FROM testcases
+LEFT JOIN testcase_results AS r ON testcases.testcase_id = r.testcase_id
+WHERE r.submission_id = $1
+`
+
+func (q *Queries) AggregateTestcaseResults(ctx context.Context, submissionID int32) (string, error) {
+ row := q.db.QueryRow(ctx, aggregateTestcaseResults, submissionID)
+ var status string
+ err := row.Scan(&status)
+ return status, err
+}
+
const createSubmission = `-- name: CreateSubmission :one
INSERT INTO submissions (game_id, user_id, code, code_size)
VALUES ($1, $2, $3, $4)
@@ -36,21 +58,43 @@ func (q *Queries) CreateSubmission(ctx context.Context, arg CreateSubmissionPara
return submission_id, err
}
-const createTestcaseExecution = `-- name: CreateTestcaseExecution :exec
-INSERT INTO testcase_executions (submission_id, testcase_id, status, stdout, stderr)
+const createSubmissionResult = `-- name: CreateSubmissionResult :exec
+INSERT INTO submission_results (submission_id, status, stdout, stderr)
+VALUES ($1, $2, $3, $4)
+`
+
+type CreateSubmissionResultParams struct {
+ SubmissionID int32
+ Status string
+ Stdout string
+ Stderr string
+}
+
+func (q *Queries) CreateSubmissionResult(ctx context.Context, arg CreateSubmissionResultParams) error {
+ _, err := q.db.Exec(ctx, createSubmissionResult,
+ arg.SubmissionID,
+ arg.Status,
+ arg.Stdout,
+ arg.Stderr,
+ )
+ return err
+}
+
+const createTestcaseResult = `-- name: CreateTestcaseResult :exec
+INSERT INTO testcase_results (submission_id, testcase_id, status, stdout, stderr)
VALUES ($1, $2, $3, $4, $5)
`
-type CreateTestcaseExecutionParams struct {
+type CreateTestcaseResultParams struct {
SubmissionID int32
- TestcaseID *int32
+ TestcaseID int32
Status string
Stdout string
Stderr string
}
-func (q *Queries) CreateTestcaseExecution(ctx context.Context, arg CreateTestcaseExecutionParams) error {
- _, err := q.db.Exec(ctx, createTestcaseExecution,
+func (q *Queries) CreateTestcaseResult(ctx context.Context, arg CreateTestcaseResultParams) error {
+ _, err := q.db.Exec(ctx, createTestcaseResult,
arg.SubmissionID,
arg.TestcaseID,
arg.Status,
diff --git a/backend/fixtures/dev.sql b/backend/fixtures/dev.sql
index 89f709a..2daa8f5 100644
--- a/backend/fixtures/dev.sql
+++ b/backend/fixtures/dev.sql
@@ -54,5 +54,6 @@ VALUES
INSERT INTO testcases
(problem_id, stdin, stdout)
VALUES
+ (1, '', '42'),
(4, '', '42'),
(7, '', '42');
diff --git a/backend/game/hub.go b/backend/game/hub.go
index 719b216..11a466b 100644
--- a/backend/game/hub.go
+++ b/backend/game/hub.go
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"log"
+ "strings"
"time"
"github.com/jackc/pgx/v5/pgtype"
@@ -33,7 +34,7 @@ type gameHub struct {
watchers map[*watcherClient]bool
registerWatcher chan *watcherClient
unregisterWatcher chan *watcherClient
- testcaseExecution chan string
+ taskResults chan taskqueue.TaskResult
}
func newGameHub(ctx context.Context, game *game, q *db.Queries, taskQueue *taskqueue.Queue) *gameHub {
@@ -49,7 +50,7 @@ func newGameHub(ctx context.Context, game *game, q *db.Queries, taskQueue *taskq
watchers: make(map[*watcherClient]bool),
registerWatcher: make(chan *watcherClient),
unregisterWatcher: make(chan *watcherClient),
- testcaseExecution: make(chan string),
+ taskResults: make(chan taskqueue.TaskResult),
}
}
@@ -161,26 +162,19 @@ func (hub *gameHub) run() {
// TODO: assert game state is gaming
log.Printf("submit: %v", message.message)
code := msg.Data.Code
- task, err := taskqueue.NewExecTask(hub.game.gameID, message.client.playerID, code)
- if err != nil {
- log.Fatalf("failed to create task: %v", err)
+ codeSize := len(code) // TODO: exclude whitespaces.
+ if err := hub.taskQueue.EnqueueTaskCreateSubmissionRecord(
+ hub.game.gameID,
+ message.client.playerID,
+ code,
+ codeSize,
+ ); err != nil {
+ // TODO: notify failure to player
+ log.Fatalf("failed to enqueue task: %v", err)
}
- hub.taskQueue.Enqueue(task)
default:
log.Printf("unexpected message type: %T", message.message)
}
- case executionStatus := <-hub.testcaseExecution:
- _ = executionStatus
- for player := range hub.players {
- player.s2cMessages <- &playerMessageS2CExecResult{
- Type: playerMessageTypeS2CExecResult,
- Data: playerMessageS2CExecResultPayload{
- Score: nil,
- Status: api.GamePlayerMessageS2CExecResultPayloadStatus(executionStatus),
- },
- }
- }
- // broadcast to watchers
case <-ticker.C:
if hub.game.state == gameStateStarting {
if time.Now().After(*hub.game.startedAt) {
@@ -211,6 +205,296 @@ func (hub *gameHub) run() {
}
}
+type codeSubmissionError struct {
+ Status string
+ Stdout string
+ Stderr string
+}
+
+func (err *codeSubmissionError) Error() string {
+ return err.Stderr
+}
+
+func (hub *gameHub) processTaskResults() {
+ for taskResult := range hub.taskResults {
+ switch taskResult := taskResult.(type) {
+ case *taskqueue.TaskResultCreateSubmissionRecord:
+ err := hub.processTaskResultCreateSubmissionRecord(taskResult)
+ if err != nil {
+ for player := range hub.players {
+ if player.playerID != taskResult.TaskPayload.UserID() {
+ continue
+ }
+ player.s2cMessages <- &playerMessageS2CExecResult{
+ Type: playerMessageTypeS2CExecResult,
+ Data: playerMessageS2CExecResultPayload{
+ Score: nil,
+ Status: api.GamePlayerMessageS2CExecResultPayloadStatus(err.Status),
+ },
+ }
+ }
+ // TODO: broadcast to watchers
+ }
+ case *taskqueue.TaskResultCompileSwiftToWasm:
+ err := hub.processTaskResultCompileSwiftToWasm(taskResult)
+ if err != nil {
+ for player := range hub.players {
+ if player.playerID != taskResult.TaskPayload.UserID() {
+ continue
+ }
+ player.s2cMessages <- &playerMessageS2CExecResult{
+ Type: playerMessageTypeS2CExecResult,
+ Data: playerMessageS2CExecResultPayload{
+ Score: nil,
+ Status: api.GamePlayerMessageS2CExecResultPayloadStatus(err.Status),
+ },
+ }
+ }
+ // TODO: broadcast to watchers
+ }
+ case *taskqueue.TaskResultCompileWasmToNativeExecutable:
+ err := hub.processTaskResultCompileWasmToNativeExecutable(taskResult)
+ if err != nil {
+ for player := range hub.players {
+ if player.playerID != taskResult.TaskPayload.UserID() {
+ continue
+ }
+ player.s2cMessages <- &playerMessageS2CExecResult{
+ Type: playerMessageTypeS2CExecResult,
+ Data: playerMessageS2CExecResultPayload{
+ Score: nil,
+ Status: api.GamePlayerMessageS2CExecResultPayloadStatus(err.Status),
+ },
+ }
+ }
+ // TODO: broadcast to watchers
+ }
+ case *taskqueue.TaskResultRunTestcase:
+ var err error
+ err = hub.processTaskResultRunTestcase(taskResult)
+ _ = err // TODO: handle err?
+ aggregatedStatus, err := hub.q.AggregateTestcaseResults(hub.ctx, int32(taskResult.TaskPayload.SubmissionID))
+ _ = err // TODO: handle err?
+ err = hub.q.CreateSubmissionResult(hub.ctx, db.CreateSubmissionResultParams{
+ SubmissionID: int32(taskResult.TaskPayload.SubmissionID),
+ Status: aggregatedStatus,
+ Stdout: "",
+ Stderr: "",
+ })
+ if err != nil {
+ for player := range hub.players {
+ if player.playerID != taskResult.TaskPayload.UserID() {
+ continue
+ }
+ player.s2cMessages <- &playerMessageS2CExecResult{
+ Type: playerMessageTypeS2CExecResult,
+ Data: playerMessageS2CExecResultPayload{
+ Score: nil,
+ Status: api.GamePlayerMessageS2CExecResultPayloadStatus("internal_error"),
+ },
+ }
+ }
+ // TODO: broadcast to watchers
+ continue
+ }
+ for player := range hub.players {
+ if player.playerID != taskResult.TaskPayload.UserID() {
+ continue
+ }
+ player.s2cMessages <- &playerMessageS2CExecResult{
+ Type: playerMessageTypeS2CExecResult,
+ Data: playerMessageS2CExecResultPayload{
+ Score: nil,
+ Status: api.GamePlayerMessageS2CExecResultPayloadStatus(aggregatedStatus),
+ },
+ }
+ // TODO: broadcast to watchers
+ }
+ default:
+ panic("unexpected task result type")
+ }
+ }
+}
+
+func (hub *gameHub) processTaskResultCreateSubmissionRecord(
+ taskResult *taskqueue.TaskResultCreateSubmissionRecord,
+) *codeSubmissionError {
+ if taskResult.Err != nil {
+ return &codeSubmissionError{
+ Status: "internal_error",
+ Stderr: taskResult.Err.Error(),
+ }
+ }
+
+ if err := hub.taskQueue.EnqueueTaskCompileSwiftToWasm(
+ taskResult.TaskPayload.GameID(),
+ taskResult.TaskPayload.UserID(),
+ taskResult.TaskPayload.Code(),
+ taskResult.SubmissionID,
+ ); err != nil {
+ return &codeSubmissionError{
+ Status: "internal_error",
+ Stderr: err.Error(),
+ }
+ }
+ return nil
+}
+
+func (hub *gameHub) processTaskResultCompileSwiftToWasm(
+ taskResult *taskqueue.TaskResultCompileSwiftToWasm,
+) *codeSubmissionError {
+ if taskResult.Err != nil {
+ return &codeSubmissionError{
+ Status: "internal_error",
+ Stderr: taskResult.Err.Error(),
+ }
+ }
+
+ if taskResult.Status != "success" {
+ if err := hub.q.CreateSubmissionResult(hub.ctx, db.CreateSubmissionResultParams{
+ SubmissionID: int32(taskResult.TaskPayload.SubmissionID),
+ Status: taskResult.Status,
+ Stdout: taskResult.Stdout,
+ Stderr: taskResult.Stderr,
+ }); err != nil {
+ return &codeSubmissionError{
+ Status: "internal_error",
+ Stderr: err.Error(),
+ }
+ }
+ return &codeSubmissionError{
+ Status: taskResult.Status,
+ Stdout: taskResult.Stdout,
+ Stderr: taskResult.Stderr,
+ }
+ }
+ if err := hub.taskQueue.EnqueueTaskCompileWasmToNativeExecutable(
+ taskResult.TaskPayload.GameID(),
+ taskResult.TaskPayload.UserID(),
+ taskResult.TaskPayload.Code(),
+ taskResult.TaskPayload.SubmissionID,
+ ); err != nil {
+ return &codeSubmissionError{
+ Status: "internal_error",
+ Stderr: err.Error(),
+ }
+ }
+ return nil
+}
+
+func (hub *gameHub) processTaskResultCompileWasmToNativeExecutable(
+ taskResult *taskqueue.TaskResultCompileWasmToNativeExecutable,
+) *codeSubmissionError {
+ if taskResult.Err != nil {
+ return &codeSubmissionError{
+ Status: "internal_error",
+ Stderr: taskResult.Err.Error(),
+ }
+ }
+
+ if taskResult.Status != "success" {
+ if err := hub.q.CreateSubmissionResult(hub.ctx, db.CreateSubmissionResultParams{
+ SubmissionID: int32(taskResult.TaskPayload.SubmissionID),
+ Status: taskResult.Status,
+ Stdout: taskResult.Stdout,
+ Stderr: taskResult.Stderr,
+ }); err != nil {
+ return &codeSubmissionError{
+ Status: "internal_error",
+ Stderr: err.Error(),
+ }
+ }
+ return &codeSubmissionError{
+ Status: taskResult.Status,
+ Stdout: taskResult.Stdout,
+ Stderr: taskResult.Stderr,
+ }
+ }
+
+ testcases, err := hub.q.ListTestcasesByGameID(hub.ctx, int32(taskResult.TaskPayload.GameID()))
+ if err != nil {
+ return &codeSubmissionError{
+ Status: "internal_error",
+ Stderr: err.Error(),
+ }
+ }
+ if len(testcases) == 0 {
+ return &codeSubmissionError{
+ Status: "internal_error",
+ Stderr: "no testcases found",
+ }
+ }
+
+ for _, testcase := range testcases {
+ if err := hub.taskQueue.EnqueueTaskRunTestcase(
+ taskResult.TaskPayload.GameID(),
+ taskResult.TaskPayload.UserID(),
+ taskResult.TaskPayload.Code(),
+ taskResult.TaskPayload.SubmissionID,
+ int(testcase.TestcaseID),
+ testcase.Stdin,
+ testcase.Stdout,
+ ); err != nil {
+ return &codeSubmissionError{
+ Status: "internal_error",
+ Stderr: err.Error(),
+ }
+ }
+ }
+ return nil
+}
+
+func (hub *gameHub) processTaskResultRunTestcase(
+ taskResult *taskqueue.TaskResultRunTestcase,
+) *codeSubmissionError {
+ if taskResult.Err != nil {
+ return &codeSubmissionError{
+ Status: "internal_error",
+ Stderr: taskResult.Err.Error(),
+ }
+ }
+
+ if taskResult.Status != "success" {
+ if err := hub.q.CreateTestcaseResult(hub.ctx, db.CreateTestcaseResultParams{
+ SubmissionID: int32(taskResult.TaskPayload.SubmissionID),
+ TestcaseID: int32(taskResult.TaskPayload.TestcaseID),
+ Status: taskResult.Status,
+ Stdout: taskResult.Stdout,
+ Stderr: taskResult.Stderr,
+ }); err != nil {
+ return &codeSubmissionError{
+ Status: "internal_error",
+ Stderr: err.Error(),
+ }
+ }
+ return &codeSubmissionError{
+ Status: taskResult.Status,
+ Stdout: taskResult.Stdout,
+ Stderr: taskResult.Stderr,
+ }
+ }
+ if !isTestcaseResultCorrect(taskResult.TaskPayload.Stdout, taskResult.Stdout) {
+ if err := hub.q.CreateTestcaseResult(hub.ctx, db.CreateTestcaseResultParams{
+ SubmissionID: int32(taskResult.TaskPayload.SubmissionID),
+ TestcaseID: int32(taskResult.TaskPayload.TestcaseID),
+ Status: "wrong_answer",
+ Stdout: taskResult.Stdout,
+ Stderr: taskResult.Stderr,
+ }); err != nil {
+ return &codeSubmissionError{
+ Status: "internal_error",
+ Stderr: err.Error(),
+ }
+ }
+ return &codeSubmissionError{
+ Status: "wrong_answer",
+ Stdout: taskResult.Stdout,
+ Stderr: taskResult.Stderr,
+ }
+ }
+ return nil
+}
+
func (hub *gameHub) startGame() error {
for player := range hub.players {
player.s2cMessages <- &playerMessageS2CPrepare{
@@ -261,16 +545,18 @@ func (hub *gameHub) closeWatcherClient(watcher *watcherClient) {
}
type GameHubs struct {
- hubs map[int]*gameHub
- q *db.Queries
- taskQueue *taskqueue.Queue
+ hubs map[int]*gameHub
+ q *db.Queries
+ taskQueue *taskqueue.Queue
+ taskResults chan taskqueue.TaskResult
}
-func NewGameHubs(q *db.Queries, taskQueue *taskqueue.Queue) *GameHubs {
+func NewGameHubs(q *db.Queries, taskQueue *taskqueue.Queue, taskResults chan taskqueue.TaskResult) *GameHubs {
return &GameHubs{
- hubs: make(map[int]*gameHub),
- q: q,
- taskQueue: taskQueue,
+ hubs: make(map[int]*gameHub),
+ q: q,
+ taskQueue: taskQueue,
+ taskResults: taskResults,
}
}
@@ -327,6 +613,16 @@ func (hubs *GameHubs) RestoreFromDB(ctx context.Context) error {
func (hubs *GameHubs) Run() {
for _, hub := range hubs.hubs {
go hub.run()
+ go hub.processTaskResults()
+ }
+
+ for taskResult := range hubs.taskResults {
+ hub := hubs.getHub(taskResult.GameID())
+ if hub == nil {
+ log.Printf("no such game: %d", taskResult.GameID())
+ continue
+ }
+ hub.taskResults <- taskResult
}
}
@@ -342,6 +638,8 @@ func (hubs *GameHubs) StartGame(gameID int) error {
return hub.startGame()
}
-func (hubs *GameHubs) C() chan string {
- return hubs.hubs[7].testcaseExecution
+func isTestcaseResultCorrect(expectedStdout, actualStdout string) bool {
+ expectedStdout = strings.TrimSpace(expectedStdout)
+ actualStdout = strings.TrimSpace(actualStdout)
+ return actualStdout == expectedStdout
}
diff --git a/backend/gen/api_handler_wrapper_gen.go b/backend/gen/api/handler_wrapper_gen.go
index 7fd34b2..7fd34b2 100644
--- a/backend/gen/api_handler_wrapper_gen.go
+++ b/backend/gen/api/handler_wrapper_gen.go
diff --git a/backend/gen/gen.go b/backend/gen/gen.go
index 6fb430f..276bc05 100644
--- a/backend/gen/gen.go
+++ b/backend/gen/gen.go
@@ -1,5 +1,6 @@
-package main
+package gen
//go:generate go run github.com/sqlc-dev/sqlc/cmd/sqlc generate
//go:generate go run github.com/oapi-codegen/oapi-codegen/v2/cmd/oapi-codegen -config oapi-codegen.yaml ../../openapi.yaml
-//go:generate go run ./api_handler_wrapper_gen.go -i ../api/generated.go -o ../api/handler_wrapper.go
+//go:generate go run ./api/handler_wrapper_gen.go -i ../api/generated.go -o ../api/handler_wrapper.go
+//go:generate go run ./taskqueue/processor_wrapper_gen.go -i ../taskqueue/tasks.go -o ../taskqueue/processor_wrapper.go
diff --git a/backend/gen/taskqueue/processor_wrapper_gen.go b/backend/gen/taskqueue/processor_wrapper_gen.go
new file mode 100644
index 0000000..85960ae
--- /dev/null
+++ b/backend/gen/taskqueue/processor_wrapper_gen.go
@@ -0,0 +1,132 @@
+package main
+
+import (
+ "bytes"
+ "flag"
+ "go/ast"
+ "go/format"
+ "go/parser"
+ "go/token"
+ "os"
+ "slices"
+ "strings"
+ "text/template"
+)
+
+func main() {
+ inputFile := flag.String("i", "", "input file")
+ outputFile := flag.String("o", "", "output file")
+ flag.Parse()
+
+ if inputFile == nil || *inputFile == "" || outputFile == nil || *outputFile == "" {
+ flag.PrintDefaults()
+ os.Exit(1)
+ }
+
+ // Parse the input file
+ fileSet := token.NewFileSet()
+ parsedFile, err := parser.ParseFile(fileSet, *inputFile, nil, parser.SkipObjectResolution)
+ if err != nil {
+ panic(err)
+ }
+
+ // Find TaskType* constants.
+ var consts []string
+ for _, decl := range parsedFile.Decls {
+ genDecl, ok := decl.(*ast.GenDecl)
+ if !ok {
+ continue
+ }
+ for _, spec := range genDecl.Specs {
+ valueSpec, ok := spec.(*ast.ValueSpec)
+ if !ok {
+ continue
+ }
+ for _, name := range valueSpec.Names {
+ if !strings.HasPrefix(name.Name, "TaskType") {
+ continue
+ }
+ if name.Name == "TaskType" {
+ continue
+ }
+ consts = append(consts, strings.TrimPrefix(name.Name, "TaskType"))
+ }
+ }
+ }
+ if len(consts) == 0 {
+ panic("TaskType* constants not found")
+ }
+ slices.Sort(consts)
+
+ // Generate code.
+ tmpl, err := template.New("code").Parse(templateText)
+ if err != nil {
+ panic(err)
+ }
+
+ var buf bytes.Buffer
+ err = tmpl.Execute(&buf, consts)
+ if err != nil {
+ panic(err)
+ }
+
+ formatted, err := format.Source(buf.Bytes())
+ if err != nil {
+ panic(err)
+ }
+
+ err = os.WriteFile(*outputFile, formatted, 0644)
+ if err != nil {
+ panic(err)
+ }
+}
+
+const templateText = `// Code generated by go generate; DO NOT EDIT.
+
+package taskqueue
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+
+ "github.com/hibiken/asynq"
+)
+
+type processorWrapper struct {
+ impl processor
+ results chan TaskResult
+}
+
+func newProcessorWrapper(impl processor) *processorWrapper {
+ return &processorWrapper{
+ impl: impl,
+ results: make(chan TaskResult),
+ }
+}
+
+{{ range . }}
+ func (p *processorWrapper) processTask{{ . }}(ctx context.Context, t *asynq.Task) error {
+ var payload TaskPayload{{ . }}
+ if err := json.Unmarshal(t.Payload(), &payload); err != nil {
+ err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
+ p.results <- &TaskResult{{ . }}{Err: err}
+ return err
+ }
+
+ result, err := p.impl.doProcessTask{{ . }}(ctx, &payload)
+ if err != nil {
+ retryCount, _ := asynq.GetRetryCount(ctx)
+ maxRetry, _ := asynq.GetMaxRetry(ctx)
+ isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry
+ if !isRecoverable {
+ p.results <- &TaskResult{{ . }}{Err: err}
+ }
+ return err
+ }
+ p.results <- result
+ return nil
+ }
+{{ end }}
+`
diff --git a/backend/main.go b/backend/main.go
index e3d0052..62499d6 100644
--- a/backend/main.go
+++ b/backend/main.go
@@ -61,8 +61,9 @@ func main() {
e.Use(middleware.Recover())
taskQueue := taskqueue.NewQueue("task-db:6379")
+ workerServer := taskqueue.NewWorkerServer("task-db:6379", queries)
- gameHubs := game.NewGameHubs(queries, taskQueue)
+ gameHubs := game.NewGameHubs(queries, taskQueue, workerServer.Results())
err = gameHubs.RestoreFromDB(ctx)
if err != nil {
log.Fatalf("Error restoring game hubs from db %v", err)
@@ -95,9 +96,8 @@ func main() {
return c.Redirect(http.StatusPermanentRedirect, "http://localhost:5173/logout")
})
- gameHubs.Run()
+ go gameHubs.Run()
- workerServer := taskqueue.NewWorkerServer("task-db:6379", queries, gameHubs.C())
go func() {
workerServer.Run()
}()
diff --git a/backend/query.sql b/backend/query.sql
index ea04c08..e767746 100644
--- a/backend/query.sql
+++ b/backend/query.sql
@@ -68,6 +68,24 @@ SELECT * FROM testcases
WHERE testcases.problem_id = (SELECT problem_id FROM games WHERE game_id = $1)
ORDER BY testcases.testcase_id;
--- name: CreateTestcaseExecution :exec
-INSERT INTO testcase_executions (submission_id, testcase_id, status, stdout, stderr)
+-- name: CreateSubmissionResult :exec
+INSERT INTO submission_results (submission_id, status, stdout, stderr)
+VALUES ($1, $2, $3, $4);
+
+-- name: CreateTestcaseResult :exec
+INSERT INTO testcase_results (submission_id, testcase_id, status, stdout, stderr)
VALUES ($1, $2, $3, $4, $5);
+
+-- name: AggregateTestcaseResults :one
+SELECT
+ CASE
+ WHEN COUNT(CASE WHEN r.status IS NULL THEN 1 END) > 0 THEN 'running'
+ WHEN COUNT(CASE WHEN r.status = 'internal_error' THEN 1 END) > 0 THEN 'internal_error'
+ WHEN COUNT(CASE WHEN r.status = 'timeout' THEN 1 END) > 0 THEN 'timeout'
+ WHEN COUNT(CASE WHEN r.status = 'runtime_error' THEN 1 END) > 0 THEN 'runtime_error'
+ WHEN COUNT(CASE WHEN r.status = 'wrong_answer' THEN 1 END) > 0 THEN 'wrong_answer'
+ ELSE 'success'
+ END AS status
+FROM testcases
+LEFT JOIN testcase_results AS r ON testcases.testcase_id = r.testcase_id
+WHERE r.submission_id = $1;
diff --git a/backend/schema.sql b/backend/schema.sql
index d0b6c40..74d1202 100644
--- a/backend/schema.sql
+++ b/backend/schema.sql
@@ -64,14 +64,27 @@ CREATE TABLE submissions (
CONSTRAINT fk_user_id FOREIGN KEY(user_id) REFERENCES users(user_id)
);
-CREATE TABLE testcase_executions (
- testcase_execution_id SERIAL PRIMARY KEY,
- submission_id INT NOT NULL,
- testcase_id INT,
- status VARCHAR(16) NOT NULL,
- stdout TEXT NOT NULL,
- stderr TEXT NOT NULL,
+CREATE TABLE submission_results (
+ submission_result_id SERIAL PRIMARY KEY,
+ submission_id INT NOT NULL UNIQUE,
+ status VARCHAR(16) NOT NULL,
+ stdout TEXT NOT NULL,
+ stderr TEXT NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT NOW(),
+ CONSTRAINT fk_submission_id FOREIGN KEY(submission_id) REFERENCES submissions(submission_id)
+);
+CREATE INDEX idx_submission_results_submission_id ON submission_results(submission_id);
+
+CREATE TABLE testcase_results (
+ testcase_result_id SERIAL PRIMARY KEY,
+ submission_id INT NOT NULL,
+ testcase_id INT NOT NULL,
+ status VARCHAR(16) NOT NULL,
+ stdout TEXT NOT NULL,
+ stderr TEXT NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT NOW(),
CONSTRAINT fk_submission_id FOREIGN KEY(submission_id) REFERENCES submissions(submission_id),
- CONSTRAINT fk_testcase_id FOREIGN KEY(testcase_id) REFERENCES testcases(testcase_id)
+ CONSTRAINT fk_testcase_id FOREIGN KEY(testcase_id) REFERENCES testcases(testcase_id),
+ CONSTRAINT uq_submission_id_testcase_id UNIQUE(submission_id, testcase_id)
);
-CREATE INDEX idx_testcase_executions_submission_id ON testcase_executions(submission_id);
+CREATE INDEX idx_testcase_results_submission_id ON testcase_results(submission_id);
diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go
index 1d4c412..b64b01c 100644
--- a/backend/taskqueue/processor.go
+++ b/backend/taskqueue/processor.go
@@ -6,195 +6,150 @@ import (
"encoding/json"
"fmt"
"net/http"
- "strings"
-
- "github.com/hibiken/asynq"
"github.com/nsfisis/iosdc-japan-2024-albatross/backend/db"
)
-type ExecProcessor struct {
+type processor struct {
q *db.Queries
- c chan string
}
-func NewExecProcessor(q *db.Queries, c chan string) *ExecProcessor {
- return &ExecProcessor{
+func newProcessor(q *db.Queries) processor {
+ return processor{
q: q,
- c: c,
}
}
-func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
- var payload TaskExecPlayload
- if err := json.Unmarshal(t.Payload(), &payload); err != nil {
- return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
- }
-
+func (p *processor) doProcessTaskCreateSubmissionRecord(
+ ctx context.Context,
+ payload *TaskPayloadCreateSubmissionRecord,
+) (*TaskResultCreateSubmissionRecord, error) {
// TODO: upsert
- // Create submission record.
submissionID, err := p.q.CreateSubmission(ctx, db.CreateSubmissionParams{
- GameID: int32(payload.GameID),
- UserID: int32(payload.UserID),
- Code: payload.Code,
- CodeSize: int32(len(payload.Code)), // TODO: exclude whitespaces.
+ GameID: int32(payload.GameID()),
+ UserID: int32(payload.UserID()),
+ Code: payload.Code(),
+ CodeSize: int32(payload.CodeSize),
})
if err != nil {
- return fmt.Errorf("CreateSubmission failed: %v", err)
+ return nil, err
}
- {
- type swiftcRequestData struct {
- MaxDuration int `json:"max_duration_ms"`
- Code string `json:"code"`
- }
- type swiftcResponseData struct {
- Result string `json:"result"`
- Stdout string `json:"stdout"`
- Stderr string `json:"stderr"`
- }
- reqData := swiftcRequestData{
- MaxDuration: 5000,
- Code: payload.Code,
- }
- reqJson, err := json.Marshal(reqData)
- if err != nil {
- return fmt.Errorf("json.Marshal failed: %v", err)
- }
- res, err := http.Post("http://worker:80/api/swiftc", "application/json", bytes.NewBuffer(reqJson))
- if err != nil {
- return fmt.Errorf("http.Post failed: %v", err)
- }
- resData := swiftcResponseData{}
- if err := json.NewDecoder(res.Body).Decode(&resData); err != nil {
- return fmt.Errorf("json.Decode failed: %v", err)
- }
- if resData.Result != "success" {
- err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{
- SubmissionID: submissionID,
- TestcaseID: nil,
- Status: "compile_error",
- Stdout: resData.Stdout,
- Stderr: resData.Stderr,
- })
- if err != nil {
- return fmt.Errorf("CreateTestcaseExecution failed: %v", err)
- }
- p.c <- "compile_error"
- return fmt.Errorf("swiftc failed: %v", resData.Stderr)
- }
- }
- {
- type wasmcRequestData struct {
- MaxDuration int `json:"max_duration_ms"`
- Code string `json:"code"`
- }
- type wasmcResponseData struct {
- Result string `json:"result"`
- Stdout string `json:"stdout"`
- Stderr string `json:"stderr"`
- }
- reqData := wasmcRequestData{
- MaxDuration: 5000,
- Code: payload.Code,
- }
- reqJson, err := json.Marshal(reqData)
- if err != nil {
- return fmt.Errorf("json.Marshal failed: %v", err)
- }
- res, err := http.Post("http://worker:80/api/wasmc", "application/json", bytes.NewBuffer(reqJson))
- if err != nil {
- return fmt.Errorf("http.Post failed: %v", err)
- }
- resData := wasmcResponseData{}
- if err := json.NewDecoder(res.Body).Decode(&resData); err != nil {
- return fmt.Errorf("json.Decode failed: %v", err)
- }
- if resData.Result != "success" {
- err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{
- SubmissionID: submissionID,
- TestcaseID: nil,
- Status: "compile_error",
- Stdout: resData.Stdout,
- Stderr: resData.Stderr,
- })
- if err != nil {
- return fmt.Errorf("CreateTestcaseExecution failed: %v", err)
- }
- p.c <- "compile_error"
- return fmt.Errorf("wasmc failed: %v", resData.Stderr)
- }
- }
+ return &TaskResultCreateSubmissionRecord{
+ TaskPayload: payload,
+ SubmissionID: int(submissionID),
+ }, nil
+}
- testcases, err := p.q.ListTestcasesByGameID(ctx, int32(payload.GameID))
+func (p *processor) doProcessTaskCompileSwiftToWasm(
+ _ context.Context,
+ payload *TaskPayloadCompileSwiftToWasm,
+) (*TaskResultCompileSwiftToWasm, error) {
+ type swiftcRequestData struct {
+ MaxDuration int `json:"max_duration_ms"`
+ Code string `json:"code"`
+ }
+ type swiftcResponseData struct {
+ Status string `json:"status"`
+ Stdout string `json:"stdout"`
+ Stderr string `json:"stderr"`
+ }
+ reqData := swiftcRequestData{
+ MaxDuration: 5000,
+ Code: payload.Code(),
+ }
+ reqJson, err := json.Marshal(reqData)
if err != nil {
- return fmt.Errorf("ListTestcasesByGameID failed: %v", err)
+ return nil, fmt.Errorf("json.Marshal failed: %v", err)
}
-
- for _, testcase := range testcases {
- type testrunRequestData struct {
- MaxDuration int `json:"max_duration_ms"`
- Code string `json:"code"`
- Stdin string `json:"stdin"`
- }
- type testrunResponseData struct {
- Result string `json:"result"`
- Stdout string `json:"stdout"`
- Stderr string `json:"stderr"`
- }
- reqData := testrunRequestData{
- MaxDuration: 5000,
- Code: payload.Code,
- Stdin: testcase.Stdin,
- }
- reqJson, err := json.Marshal(reqData)
- if err != nil {
- return fmt.Errorf("json.Marshal failed: %v", err)
- }
- res, err := http.Post("http://worker:80/api/testrun", "application/json", bytes.NewBuffer(reqJson))
- if err != nil {
- return fmt.Errorf("http.Post failed: %v", err)
- }
- resData := testrunResponseData{}
- if err := json.NewDecoder(res.Body).Decode(&resData); err != nil {
- return fmt.Errorf("json.Decode failed: %v", err)
- }
- if resData.Result != "success" {
- err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{
- SubmissionID: submissionID,
- TestcaseID: &testcase.TestcaseID,
- Status: resData.Result,
- Stdout: resData.Stdout,
- Stderr: resData.Stderr,
- })
- if err != nil {
- return fmt.Errorf("CreateTestcaseExecution failed: %v", err)
- }
- p.c <- resData.Result
- return fmt.Errorf("testrun failed: %v", resData.Stderr)
- }
- if !isTestcaseExecutionCorrect(testcase.Stdout, resData.Stdout) {
- err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{
- SubmissionID: submissionID,
- TestcaseID: &testcase.TestcaseID,
- Status: "wrong_answer",
- Stdout: resData.Stdout,
- Stderr: resData.Stderr,
- })
- if err != nil {
- return fmt.Errorf("CreateTestcaseExecution failed: %v", err)
- }
- p.c <- "wrong_answer"
- return fmt.Errorf("testrun failed: %v", resData.Stdout)
- }
+ res, err := http.Post("http://worker:80/api/swiftc", "application/json", bytes.NewBuffer(reqJson))
+ if err != nil {
+ return nil, fmt.Errorf("http.Post failed: %v", err)
+ }
+ resData := swiftcResponseData{}
+ if err := json.NewDecoder(res.Body).Decode(&resData); err != nil {
+ return nil, fmt.Errorf("json.Decode failed: %v", err)
}
+ return &TaskResultCompileSwiftToWasm{
+ TaskPayload: payload,
+ Status: resData.Status,
+ Stdout: resData.Stdout,
+ Stderr: resData.Stderr,
+ }, nil
+}
- p.c <- "success"
- return nil
+func (p *processor) doProcessTaskCompileWasmToNativeExecutable(
+ _ context.Context,
+ payload *TaskPayloadCompileWasmToNativeExecutable,
+) (*TaskResultCompileWasmToNativeExecutable, error) {
+ type wasmcRequestData struct {
+ MaxDuration int `json:"max_duration_ms"`
+ Code string `json:"code"`
+ }
+ type wasmcResponseData struct {
+ Status string `json:"status"`
+ Stdout string `json:"stdout"`
+ Stderr string `json:"stderr"`
+ }
+ reqData := wasmcRequestData{
+ MaxDuration: 5000,
+ Code: payload.Code(),
+ }
+ reqJson, err := json.Marshal(reqData)
+ if err != nil {
+ return nil, fmt.Errorf("json.Marshal failed: %v", err)
+ }
+ res, err := http.Post("http://worker:80/api/wasmc", "application/json", bytes.NewBuffer(reqJson))
+ if err != nil {
+ return nil, fmt.Errorf("http.Post failed: %v", err)
+ }
+ resData := wasmcResponseData{}
+ if err := json.NewDecoder(res.Body).Decode(&resData); err != nil {
+ return nil, fmt.Errorf("json.Decode failed: %v", err)
+ }
+ return &TaskResultCompileWasmToNativeExecutable{
+ TaskPayload: payload,
+ Status: resData.Status,
+ Stdout: resData.Stdout,
+ Stderr: resData.Stderr,
+ }, nil
}
-func isTestcaseExecutionCorrect(expectedStdout, actualStdout string) bool {
- expectedStdout = strings.TrimSpace(expectedStdout)
- actualStdout = strings.TrimSpace(actualStdout)
- return actualStdout == expectedStdout
+func (p *processor) doProcessTaskRunTestcase(
+ _ context.Context,
+ payload *TaskPayloadRunTestcase,
+) (*TaskResultRunTestcase, error) {
+ type testrunRequestData struct {
+ MaxDuration int `json:"max_duration_ms"`
+ Code string `json:"code"`
+ Stdin string `json:"stdin"`
+ }
+ type testrunResponseData struct {
+ Status string `json:"status"`
+ Stdout string `json:"stdout"`
+ Stderr string `json:"stderr"`
+ }
+ reqData := testrunRequestData{
+ MaxDuration: 5000,
+ Code: payload.Code(),
+ Stdin: payload.Stdin,
+ }
+ reqJson, err := json.Marshal(reqData)
+ if err != nil {
+ return nil, fmt.Errorf("json.Marshal failed: %v", err)
+ }
+ res, err := http.Post("http://worker:80/api/testrun", "application/json", bytes.NewBuffer(reqJson))
+ if err != nil {
+ return nil, fmt.Errorf("http.Post failed: %v", err)
+ }
+ resData := testrunResponseData{}
+ if err := json.NewDecoder(res.Body).Decode(&resData); err != nil {
+ return nil, fmt.Errorf("json.Decode failed: %v", err)
+ }
+ return &TaskResultRunTestcase{
+ TaskPayload: payload,
+ Status: resData.Status,
+ Stdout: resData.Stdout,
+ Stderr: resData.Stderr,
+ }, nil
}
diff --git a/backend/taskqueue/processor_wrapper.go b/backend/taskqueue/processor_wrapper.go
new file mode 100644
index 0000000..b1fbd16
--- /dev/null
+++ b/backend/taskqueue/processor_wrapper.go
@@ -0,0 +1,112 @@
+// Code generated by go generate; DO NOT EDIT.
+
+package taskqueue
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+
+ "github.com/hibiken/asynq"
+)
+
+type processorWrapper struct {
+ impl processor
+ results chan TaskResult
+}
+
+func newProcessorWrapper(impl processor) *processorWrapper {
+ return &processorWrapper{
+ impl: impl,
+ results: make(chan TaskResult),
+ }
+}
+
+func (p *processorWrapper) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq.Task) error {
+ var payload TaskPayloadCompileSwiftToWasm
+ if err := json.Unmarshal(t.Payload(), &payload); err != nil {
+ err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
+ p.results <- &TaskResultCompileSwiftToWasm{Err: err}
+ return err
+ }
+
+ result, err := p.impl.doProcessTaskCompileSwiftToWasm(ctx, &payload)
+ if err != nil {
+ retryCount, _ := asynq.GetRetryCount(ctx)
+ maxRetry, _ := asynq.GetMaxRetry(ctx)
+ isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry
+ if !isRecoverable {
+ p.results <- &TaskResultCompileSwiftToWasm{Err: err}
+ }
+ return err
+ }
+ p.results <- result
+ return nil
+}
+
+func (p *processorWrapper) processTaskCompileWasmToNativeExecutable(ctx context.Context, t *asynq.Task) error {
+ var payload TaskPayloadCompileWasmToNativeExecutable
+ if err := json.Unmarshal(t.Payload(), &payload); err != nil {
+ err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
+ p.results <- &TaskResultCompileWasmToNativeExecutable{Err: err}
+ return err
+ }
+
+ result, err := p.impl.doProcessTaskCompileWasmToNativeExecutable(ctx, &payload)
+ if err != nil {
+ retryCount, _ := asynq.GetRetryCount(ctx)
+ maxRetry, _ := asynq.GetMaxRetry(ctx)
+ isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry
+ if !isRecoverable {
+ p.results <- &TaskResultCompileWasmToNativeExecutable{Err: err}
+ }
+ return err
+ }
+ p.results <- result
+ return nil
+}
+
+func (p *processorWrapper) processTaskCreateSubmissionRecord(ctx context.Context, t *asynq.Task) error {
+ var payload TaskPayloadCreateSubmissionRecord
+ if err := json.Unmarshal(t.Payload(), &payload); err != nil {
+ err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
+ p.results <- &TaskResultCreateSubmissionRecord{Err: err}
+ return err
+ }
+
+ result, err := p.impl.doProcessTaskCreateSubmissionRecord(ctx, &payload)
+ if err != nil {
+ retryCount, _ := asynq.GetRetryCount(ctx)
+ maxRetry, _ := asynq.GetMaxRetry(ctx)
+ isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry
+ if !isRecoverable {
+ p.results <- &TaskResultCreateSubmissionRecord{Err: err}
+ }
+ return err
+ }
+ p.results <- result
+ return nil
+}
+
+func (p *processorWrapper) processTaskRunTestcase(ctx context.Context, t *asynq.Task) error {
+ var payload TaskPayloadRunTestcase
+ if err := json.Unmarshal(t.Payload(), &payload); err != nil {
+ err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
+ p.results <- &TaskResultRunTestcase{Err: err}
+ return err
+ }
+
+ result, err := p.impl.doProcessTaskRunTestcase(ctx, &payload)
+ if err != nil {
+ retryCount, _ := asynq.GetRetryCount(ctx)
+ maxRetry, _ := asynq.GetMaxRetry(ctx)
+ isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry
+ if !isRecoverable {
+ p.results <- &TaskResultRunTestcase{Err: err}
+ }
+ return err
+ }
+ p.results <- result
+ return nil
+}
diff --git a/backend/taskqueue/queue.go b/backend/taskqueue/queue.go
index 53ec6d6..515a406 100644
--- a/backend/taskqueue/queue.go
+++ b/backend/taskqueue/queue.go
@@ -20,6 +20,84 @@ func (q *Queue) Close() {
q.client.Close()
}
-func (q *Queue) Enqueue(task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error) {
- return q.client.Enqueue(task, opts...)
+func (q *Queue) EnqueueTaskCreateSubmissionRecord(
+ gameID int,
+ userID int,
+ code string,
+ codeSize int,
+) error {
+ task, err := newTaskCreateSubmissionRecord(
+ gameID,
+ userID,
+ code,
+ codeSize,
+ )
+ if err != nil {
+ return err
+ }
+ _, err = q.client.Enqueue(task)
+ return err
+}
+
+func (q *Queue) EnqueueTaskCompileSwiftToWasm(
+ gameID int,
+ userID int,
+ code string,
+ submissionID int,
+) error {
+ task, err := newTaskCompileSwiftToWasm(
+ gameID,
+ userID,
+ code,
+ submissionID,
+ )
+ if err != nil {
+ return err
+ }
+ _, err = q.client.Enqueue(task)
+ return err
+}
+
+func (q *Queue) EnqueueTaskCompileWasmToNativeExecutable(
+ gameID int,
+ userID int,
+ code string,
+ submissionID int,
+) error {
+ task, err := newTaskCompileWasmToNativeExecutable(
+ gameID,
+ userID,
+ code,
+ submissionID,
+ )
+ if err != nil {
+ return err
+ }
+ _, err = q.client.Enqueue(task)
+ return err
+}
+
+func (q *Queue) EnqueueTaskRunTestcase(
+ gameID int,
+ userID int,
+ code string,
+ submissionID int,
+ testcaseID int,
+ stdin string,
+ stdout string,
+) error {
+ task, err := newTaskRunTestcase(
+ gameID,
+ userID,
+ code,
+ submissionID,
+ testcaseID,
+ stdin,
+ stdout,
+ )
+ if err != nil {
+ return err
+ }
+ _, err = q.client.Enqueue(task)
+ return err
}
diff --git a/backend/taskqueue/tasks.go b/backend/taskqueue/tasks.go
index cd67948..cbe83b1 100644
--- a/backend/taskqueue/tasks.go
+++ b/backend/taskqueue/tasks.go
@@ -6,24 +6,191 @@ import (
"github.com/hibiken/asynq"
)
+type TaskType string
+
const (
- TaskTypeExec = "exec"
+ TaskTypeCreateSubmissionRecord TaskType = "create_submission_record"
+ TaskTypeCompileSwiftToWasm TaskType = "compile_swift_to_wasm"
+ TaskTypeCompileWasmToNativeExecutable TaskType = "compile_wasm_to_native_executable"
+ TaskTypeRunTestcase TaskType = "run_testcase"
)
-type TaskExecPlayload struct {
+type TaskPayloadBase struct {
GameID int
UserID int
Code string
}
-func NewExecTask(gameID, userID int, code string) (*asynq.Task, error) {
- payload, err := json.Marshal(TaskExecPlayload{
- GameID: gameID,
- UserID: userID,
- Code: code,
+type TaskPayloadCreateSubmissionRecord struct {
+ TaskPayloadBase
+ CodeSize int
+}
+
+func newTaskCreateSubmissionRecord(
+ gameID int,
+ userID int,
+ code string,
+ codeSize int,
+) (*asynq.Task, error) {
+ payload, err := json.Marshal(TaskPayloadCreateSubmissionRecord{
+ TaskPayloadBase: TaskPayloadBase{
+ GameID: gameID,
+ UserID: userID,
+ Code: code,
+ },
+ CodeSize: codeSize,
+ })
+ if err != nil {
+ return nil, err
+ }
+ return asynq.NewTask(string(TaskTypeCreateSubmissionRecord), payload), nil
+}
+
+func (t *TaskPayloadCreateSubmissionRecord) GameID() int { return t.TaskPayloadBase.GameID }
+func (t *TaskPayloadCreateSubmissionRecord) UserID() int { return t.TaskPayloadBase.UserID }
+func (t *TaskPayloadCreateSubmissionRecord) Code() string { return t.TaskPayloadBase.Code }
+
+type TaskPayloadCompileSwiftToWasm struct {
+ TaskPayloadBase
+ SubmissionID int
+}
+
+func newTaskCompileSwiftToWasm(
+ gameID int,
+ userID int,
+ code string,
+ submissionID int,
+) (*asynq.Task, error) {
+ payload, err := json.Marshal(TaskPayloadCompileSwiftToWasm{
+ TaskPayloadBase: TaskPayloadBase{
+ GameID: gameID,
+ UserID: userID,
+ Code: code,
+ },
+ SubmissionID: submissionID,
+ })
+ if err != nil {
+ return nil, err
+ }
+ return asynq.NewTask(string(TaskTypeCompileSwiftToWasm), payload), nil
+}
+
+func (t *TaskPayloadCompileSwiftToWasm) GameID() int { return t.TaskPayloadBase.GameID }
+func (t *TaskPayloadCompileSwiftToWasm) UserID() int { return t.TaskPayloadBase.UserID }
+func (t *TaskPayloadCompileSwiftToWasm) Code() string { return t.TaskPayloadBase.Code }
+
+type TaskPayloadCompileWasmToNativeExecutable struct {
+ TaskPayloadBase
+ SubmissionID int
+}
+
+func newTaskCompileWasmToNativeExecutable(
+ gameID int,
+ userID int,
+ code string,
+ submissionID int,
+) (*asynq.Task, error) {
+ payload, err := json.Marshal(TaskPayloadCompileWasmToNativeExecutable{
+ TaskPayloadBase: TaskPayloadBase{
+ GameID: gameID,
+ UserID: userID,
+ Code: code,
+ },
+ SubmissionID: submissionID,
+ })
+ if err != nil {
+ return nil, err
+ }
+ return asynq.NewTask(string(TaskTypeCompileWasmToNativeExecutable), payload), nil
+}
+
+func (t *TaskPayloadCompileWasmToNativeExecutable) GameID() int { return t.TaskPayloadBase.GameID }
+func (t *TaskPayloadCompileWasmToNativeExecutable) UserID() int { return t.TaskPayloadBase.UserID }
+func (t *TaskPayloadCompileWasmToNativeExecutable) Code() string { return t.TaskPayloadBase.Code }
+
+type TaskPayloadRunTestcase struct {
+ TaskPayloadBase
+ SubmissionID int
+ TestcaseID int
+ Stdin string
+ Stdout string
+}
+
+func newTaskRunTestcase(
+ gameID int,
+ userID int,
+ code string,
+ submissionID int,
+ testcaseID int,
+ stdin string,
+ stdout string,
+) (*asynq.Task, error) {
+ payload, err := json.Marshal(TaskPayloadRunTestcase{
+ TaskPayloadBase: TaskPayloadBase{
+ GameID: gameID,
+ UserID: userID,
+ Code: code,
+ },
+ SubmissionID: submissionID,
+ TestcaseID: testcaseID,
+ Stdin: stdin,
+ Stdout: stdout,
})
if err != nil {
return nil, err
}
- return asynq.NewTask(TaskTypeExec, payload), nil
+ return asynq.NewTask(string(TaskTypeRunTestcase), payload), nil
+}
+
+func (t *TaskPayloadRunTestcase) GameID() int { return t.TaskPayloadBase.GameID }
+func (t *TaskPayloadRunTestcase) UserID() int { return t.TaskPayloadBase.UserID }
+func (t *TaskPayloadRunTestcase) Code() string { return t.TaskPayloadBase.Code }
+
+type TaskResult interface {
+ Type() TaskType
+ GameID() int
+}
+
+type TaskResultCreateSubmissionRecord struct {
+ TaskPayload *TaskPayloadCreateSubmissionRecord
+ SubmissionID int
+ Err error
}
+
+func (r *TaskResultCreateSubmissionRecord) Type() TaskType { return TaskTypeCreateSubmissionRecord }
+func (r *TaskResultCreateSubmissionRecord) GameID() int { return r.TaskPayload.GameID() }
+
+type TaskResultCompileSwiftToWasm struct {
+ TaskPayload *TaskPayloadCompileSwiftToWasm
+ Status string
+ Stdout string
+ Stderr string
+ Err error
+}
+
+func (r *TaskResultCompileSwiftToWasm) Type() TaskType { return TaskTypeCompileSwiftToWasm }
+func (r *TaskResultCompileSwiftToWasm) GameID() int { return r.TaskPayload.GameID() }
+
+type TaskResultCompileWasmToNativeExecutable struct {
+ TaskPayload *TaskPayloadCompileWasmToNativeExecutable
+ Status string
+ Stdout string
+ Stderr string
+ Err error
+}
+
+func (r *TaskResultCompileWasmToNativeExecutable) Type() TaskType {
+ return TaskTypeCompileWasmToNativeExecutable
+}
+func (r *TaskResultCompileWasmToNativeExecutable) GameID() int { return r.TaskPayload.GameID() }
+
+type TaskResultRunTestcase struct {
+ TaskPayload *TaskPayloadRunTestcase
+ Status string
+ Stdout string
+ Stderr string
+ Err error
+}
+
+func (r *TaskResultRunTestcase) Type() TaskType { return TaskTypeRunTestcase }
+func (r *TaskResultRunTestcase) GameID() int { return r.TaskPayload.GameID() }
diff --git a/backend/taskqueue/worker_server.go b/backend/taskqueue/worker_server.go
index 485d6d3..317f61c 100644
--- a/backend/taskqueue/worker_server.go
+++ b/backend/taskqueue/worker_server.go
@@ -7,27 +7,35 @@ import (
)
type WorkerServer struct {
- server *asynq.Server
- queries *db.Queries
- c chan string
+ server *asynq.Server
+ processor *processorWrapper
}
-func NewWorkerServer(redisAddr string, queries *db.Queries, c chan string) *WorkerServer {
+func NewWorkerServer(redisAddr string, queries *db.Queries) *WorkerServer {
+ server := asynq.NewServer(
+ asynq.RedisClientOpt{
+ Addr: redisAddr,
+ },
+ asynq.Config{},
+ )
+ processor := newProcessorWrapper(newProcessor(queries))
return &WorkerServer{
- server: asynq.NewServer(
- asynq.RedisClientOpt{
- Addr: redisAddr,
- },
- asynq.Config{},
- ),
- queries: queries,
- c: c,
+ server: server,
+ processor: processor,
}
}
func (s *WorkerServer) Run() error {
mux := asynq.NewServeMux()
- mux.Handle(TaskTypeExec, NewExecProcessor(s.queries, s.c))
+
+ mux.HandleFunc(string(TaskTypeCreateSubmissionRecord), s.processor.processTaskCreateSubmissionRecord)
+ mux.HandleFunc(string(TaskTypeCompileSwiftToWasm), s.processor.processTaskCompileSwiftToWasm)
+ mux.HandleFunc(string(TaskTypeCompileWasmToNativeExecutable), s.processor.processTaskCompileWasmToNativeExecutable)
+ mux.HandleFunc(string(TaskTypeRunTestcase), s.processor.processTaskRunTestcase)
return s.server.Run(mux)
}
+
+func (s *WorkerServer) Results() chan TaskResult {
+ return s.processor.results
+}
diff --git a/worker/exec.go b/worker/exec.go
index 2ef16fa..fb238c3 100644
--- a/worker/exec.go
+++ b/worker/exec.go
@@ -77,7 +77,7 @@ func convertCommandErrorToResultType(err error) string {
if err == context.DeadlineExceeded {
return resultTimeout
} else {
- return resultFailure
+ return resultRuntimeError
}
} else {
return resultSuccess
@@ -95,7 +95,7 @@ func execSwiftCompile(
if err := os.WriteFile(inPath, []byte(code), 0644); err != nil {
return swiftCompileResponseData{
- Result: resultInternalError,
+ Status: resultInternalError,
Stdout: "",
Stderr: err.Error(),
}
@@ -116,7 +116,7 @@ func execSwiftCompile(
)
return swiftCompileResponseData{
- Result: convertCommandErrorToResultType(err),
+ Status: convertCommandErrorToResultType(err),
Stdout: stdout,
Stderr: stderr,
}
@@ -148,7 +148,7 @@ func execWasmCompile(
)
return wasmCompileResponseData{
- Result: convertCommandErrorToResultType(err),
+ Status: convertCommandErrorToResultType(err),
Stdout: stdout,
Stderr: stderr,
}
@@ -179,7 +179,7 @@ func execTestRun(
)
return testRunResponseData{
- Result: convertCommandErrorToResultType(err),
+ Status: convertCommandErrorToResultType(err),
Stdout: stdout,
Stderr: stderr,
}
diff --git a/worker/models.go b/worker/models.go
index b838fe0..c60002c 100644
--- a/worker/models.go
+++ b/worker/models.go
@@ -7,7 +7,7 @@ import (
const (
resultSuccess = "success"
- resultFailure = "failure"
+ resultRuntimeError = "runtime_error"
resultTimeout = "timeout"
resultInternalError = "internal_error"
)
@@ -33,7 +33,7 @@ func (req *swiftCompileRequestData) validate() error {
}
type swiftCompileResponseData struct {
- Result string `json:"result"`
+ Status string `json:"status"`
Stdout string `json:"stdout"`
Stderr string `json:"stderr"`
}
@@ -44,7 +44,7 @@ type wasmCompileRequestData struct {
}
type wasmCompileResponseData struct {
- Result string `json:"result"`
+ Status string `json:"status"`
Stdout string `json:"stdout"`
Stderr string `json:"stderr"`
}
@@ -78,7 +78,7 @@ func (req *testRunRequestData) validate() error {
}
type testRunResponseData struct {
- Result string `json:"result"`
+ Status string `json:"status"`
Stdout string `json:"stdout"`
Stderr string `json:"stderr"`
}