aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--backend/game/hub.go54
-rw-r--r--backend/taskqueue/processor.go253
-rw-r--r--backend/taskqueue/tasks.go180
-rw-r--r--backend/taskqueue/worker_server.go32
4 files changed, 372 insertions, 147 deletions
diff --git a/backend/game/hub.go b/backend/game/hub.go
index 27a9847..58eb180 100644
--- a/backend/game/hub.go
+++ b/backend/game/hub.go
@@ -33,7 +33,7 @@ type gameHub struct {
watchers map[*watcherClient]bool
registerWatcher chan *watcherClient
unregisterWatcher chan *watcherClient
- taskResults chan taskqueue.TaskExecResult
+ taskResults chan taskqueue.TaskResult
}
func newGameHub(ctx context.Context, game *game, q *db.Queries, taskQueue *taskqueue.Queue) *gameHub {
@@ -49,7 +49,7 @@ func newGameHub(ctx context.Context, game *game, q *db.Queries, taskQueue *taskq
watchers: make(map[*watcherClient]bool),
registerWatcher: make(chan *watcherClient),
unregisterWatcher: make(chan *watcherClient),
- taskResults: make(chan taskqueue.TaskExecResult),
+ taskResults: make(chan taskqueue.TaskResult),
}
}
@@ -161,7 +161,13 @@ func (hub *gameHub) run() {
// TODO: assert game state is gaming
log.Printf("submit: %v", message.message)
code := msg.Data.Code
- task, err := taskqueue.NewExecTask(hub.game.gameID, message.client.playerID, code)
+ codeSize := len(code) // TODO: exclude whitespaces.
+ task, err := taskqueue.NewTaskCreateSubmissionRecord(
+ hub.game.gameID,
+ message.client.playerID,
+ code,
+ codeSize,
+ )
if err != nil {
log.Fatalf("failed to create task: %v", err)
}
@@ -201,19 +207,31 @@ func (hub *gameHub) run() {
func (hub *gameHub) processTaskResults() {
for taskResult := range hub.taskResults {
- for player := range hub.players {
- if player.playerID != taskResult.Task.UserID {
- continue
- }
- player.s2cMessages <- &playerMessageS2CExecResult{
- Type: playerMessageTypeS2CExecResult,
- Data: playerMessageS2CExecResultPayload{
- Score: nil,
- Status: api.GamePlayerMessageS2CExecResultPayloadStatus(taskResult.Status),
- },
+ switch taskResult := taskResult.(type) {
+ case *taskqueue.TaskResultCreateSubmissionRecord:
+ // todo
+ case *taskqueue.TaskResultCompileSwiftToWasm:
+ // todo
+ case *taskqueue.TaskResultCompileWasmToNativeExecutable:
+ // todo
+ case *taskqueue.TaskResultRunTestcase:
+ // todo
+ for player := range hub.players {
+ if player.playerID != taskResult.TaskPayload.UserID() {
+ continue
+ }
+ player.s2cMessages <- &playerMessageS2CExecResult{
+ Type: playerMessageTypeS2CExecResult,
+ Data: playerMessageS2CExecResultPayload{
+ Score: nil,
+ Status: api.GamePlayerMessageS2CExecResultPayloadStatus(taskResult.Status),
+ },
+ }
}
+ // broadcast to watchers
+ default:
+ panic("unexpected task result type")
}
- // broadcast to watchers
}
}
@@ -270,10 +288,10 @@ type GameHubs struct {
hubs map[int]*gameHub
q *db.Queries
taskQueue *taskqueue.Queue
- taskResults chan taskqueue.TaskExecResult
+ taskResults chan taskqueue.TaskResult
}
-func NewGameHubs(q *db.Queries, taskQueue *taskqueue.Queue, taskResults chan taskqueue.TaskExecResult) *GameHubs {
+func NewGameHubs(q *db.Queries, taskQueue *taskqueue.Queue, taskResults chan taskqueue.TaskResult) *GameHubs {
return &GameHubs{
hubs: make(map[int]*gameHub),
q: q,
@@ -339,9 +357,9 @@ func (hubs *GameHubs) Run() {
}
for taskResult := range hubs.taskResults {
- hub := hubs.getHub(taskResult.Task.GameID)
+ hub := hubs.getHub(taskResult.GameID())
if hub == nil {
- log.Printf("no such game: %d", taskResult.Task.GameID)
+ log.Printf("no such game: %d", taskResult.GameID())
continue
}
hub.taskResults <- taskResult
diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go
index b080c46..556bd78 100644
--- a/backend/taskqueue/processor.go
+++ b/backend/taskqueue/processor.go
@@ -13,125 +13,169 @@ import (
"github.com/nsfisis/iosdc-japan-2024-albatross/backend/db"
)
-type ExecProcessor struct {
+type processor struct {
q *db.Queries
- results chan TaskExecResult
+ results chan TaskResult
}
-func NewExecProcessor(q *db.Queries, results chan TaskExecResult) *ExecProcessor {
- return &ExecProcessor{
+func newProcessor(q *db.Queries) *processor {
+ return &processor{
q: q,
- results: results,
+ results: make(chan TaskResult),
}
}
-func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
- var payload TaskExecPlayload
+func (p *processor) processTaskCreateSubmissionRecord(ctx context.Context, t *asynq.Task) error {
+ var payload TaskPayloadCreateSubmissionRecord
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
+ // TODO: send result
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
// TODO: upsert
// Create submission record.
submissionID, err := p.q.CreateSubmission(ctx, db.CreateSubmissionParams{
- GameID: int32(payload.GameID),
- UserID: int32(payload.UserID),
- Code: payload.Code,
- CodeSize: int32(len(payload.Code)), // TODO: exclude whitespaces.
+ GameID: int32(payload.GameID()),
+ UserID: int32(payload.UserID()),
+ Code: payload.Code(),
+ CodeSize: int32(payload.CodeSize),
})
if err != nil {
+ // TODO: send result
return fmt.Errorf("CreateSubmission failed: %v", err)
}
- {
- type swiftcRequestData struct {
- MaxDuration int `json:"max_duration_ms"`
- Code string `json:"code"`
- }
- type swiftcResponseData struct {
- Status string `json:"status"`
- Stdout string `json:"stdout"`
- Stderr string `json:"stderr"`
- }
- reqData := swiftcRequestData{
- MaxDuration: 5000,
- Code: payload.Code,
- }
- reqJson, err := json.Marshal(reqData)
- if err != nil {
- return fmt.Errorf("json.Marshal failed: %v", err)
- }
- res, err := http.Post("http://worker:80/api/swiftc", "application/json", bytes.NewBuffer(reqJson))
+ p.results <- &TaskResultCreateSubmissionRecord{
+ TaskPayload: &payload,
+ SubmissionID: int(submissionID),
+ }
+ return nil
+}
+
+func (p *processor) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq.Task) error {
+ var payload TaskPayloadCompileSwiftToWasm
+ if err := json.Unmarshal(t.Payload(), &payload); err != nil {
+ // TODO: send result
+ return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
+ }
+
+ type swiftcRequestData struct {
+ MaxDuration int `json:"max_duration_ms"`
+ Code string `json:"code"`
+ }
+ type swiftcResponseData struct {
+ Status string `json:"status"`
+ Stdout string `json:"stdout"`
+ Stderr string `json:"stderr"`
+ }
+ reqData := swiftcRequestData{
+ MaxDuration: 5000,
+ Code: payload.Code(),
+ }
+ reqJson, err := json.Marshal(reqData)
+ if err != nil {
+ // TODO: send result
+ return fmt.Errorf("json.Marshal failed: %v", err)
+ }
+ res, err := http.Post("http://worker:80/api/swiftc", "application/json", bytes.NewBuffer(reqJson))
+ if err != nil {
+ // TODO: send result
+ return fmt.Errorf("http.Post failed: %v", err)
+ }
+ resData := swiftcResponseData{}
+ if err := json.NewDecoder(res.Body).Decode(&resData); err != nil {
+ // TODO: send result
+ return fmt.Errorf("json.Decode failed: %v", err)
+ }
+ if resData.Status != "success" {
+ err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{
+ SubmissionID: int32(payload.SubmissionID),
+ Status: "compile_error",
+ Stdout: resData.Stdout,
+ Stderr: resData.Stderr,
+ })
if err != nil {
- return fmt.Errorf("http.Post failed: %v", err)
+ // TODO: send result
+ return fmt.Errorf("CreateTestcaseResult failed: %v", err)
}
- resData := swiftcResponseData{}
- if err := json.NewDecoder(res.Body).Decode(&resData); err != nil {
- return fmt.Errorf("json.Decode failed: %v", err)
- }
- if resData.Status != "success" {
- err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{
- SubmissionID: submissionID,
- Status: "compile_error",
- Stdout: resData.Stdout,
- Stderr: resData.Stderr,
- })
- if err != nil {
- return fmt.Errorf("CreateTestcaseResult failed: %v", err)
- }
- p.results <- TaskExecResult{
- Task: &payload,
- Status: "compile_error",
- }
- return fmt.Errorf("swiftc failed: %v", resData.Stderr)
+ p.results <- &TaskResultCompileSwiftToWasm{
+ TaskPayload: &payload,
+ Status: "compile_error",
}
+ return fmt.Errorf("swiftc failed: %v", resData.Stderr)
}
- {
- type wasmcRequestData struct {
- MaxDuration int `json:"max_duration_ms"`
- Code string `json:"code"`
- }
- type wasmcResponseData struct {
- Status string `json:"status"`
- Stdout string `json:"stdout"`
- Stderr string `json:"stderr"`
- }
- reqData := wasmcRequestData{
- MaxDuration: 5000,
- Code: payload.Code,
- }
- reqJson, err := json.Marshal(reqData)
- if err != nil {
- return fmt.Errorf("json.Marshal failed: %v", err)
- }
- res, err := http.Post("http://worker:80/api/wasmc", "application/json", bytes.NewBuffer(reqJson))
+
+ // TODO: send result
+ return nil
+}
+
+func (p *processor) processTaskCompileWasmToNativeExecutable(ctx context.Context, t *asynq.Task) error {
+ var payload TaskPayloadCompileWasmToNativeExecutable
+ if err := json.Unmarshal(t.Payload(), &payload); err != nil {
+ // TODO: send result
+ return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
+ }
+
+ type wasmcRequestData struct {
+ MaxDuration int `json:"max_duration_ms"`
+ Code string `json:"code"`
+ }
+ type wasmcResponseData struct {
+ Status string `json:"status"`
+ Stdout string `json:"stdout"`
+ Stderr string `json:"stderr"`
+ }
+ reqData := wasmcRequestData{
+ MaxDuration: 5000,
+ Code: payload.Code(),
+ }
+ reqJson, err := json.Marshal(reqData)
+ if err != nil {
+ // TODO: send result
+ return fmt.Errorf("json.Marshal failed: %v", err)
+ }
+ res, err := http.Post("http://worker:80/api/wasmc", "application/json", bytes.NewBuffer(reqJson))
+ if err != nil {
+ // TODO: send result
+ return fmt.Errorf("http.Post failed: %v", err)
+ }
+ resData := wasmcResponseData{}
+ if err := json.NewDecoder(res.Body).Decode(&resData); err != nil {
+ // TODO: send result
+ return fmt.Errorf("json.Decode failed: %v", err)
+ }
+ if resData.Status != "success" {
+ err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{
+ SubmissionID: int32(payload.SubmissionID),
+ Status: "compile_error",
+ Stdout: resData.Stdout,
+ Stderr: resData.Stderr,
+ })
if err != nil {
- return fmt.Errorf("http.Post failed: %v", err)
+ // TODO: send result
+ return fmt.Errorf("CreateTestcaseResult failed: %v", err)
}
- resData := wasmcResponseData{}
- if err := json.NewDecoder(res.Body).Decode(&resData); err != nil {
- return fmt.Errorf("json.Decode failed: %v", err)
- }
- if resData.Status != "success" {
- err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{
- SubmissionID: submissionID,
- Status: "compile_error",
- Stdout: resData.Stdout,
- Stderr: resData.Stderr,
- })
- if err != nil {
- return fmt.Errorf("CreateTestcaseResult failed: %v", err)
- }
- p.results <- TaskExecResult{
- Task: &payload,
- Status: "compile_error",
- }
- return fmt.Errorf("wasmc failed: %v", resData.Stderr)
+ p.results <- &TaskResultCompileWasmToNativeExecutable{
+ TaskPayload: &payload,
+ Status: "compile_error",
}
+ return fmt.Errorf("wasmc failed: %v", resData.Stderr)
+ }
+
+ // TODO: send result
+ return nil
+}
+
+func (p *processor) processTaskRunTestcase(ctx context.Context, t *asynq.Task) error {
+ var payload TaskPayloadRunTestcase
+ if err := json.Unmarshal(t.Payload(), &payload); err != nil {
+ // TODO: send result
+ return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
- testcases, err := p.q.ListTestcasesByGameID(ctx, int32(payload.GameID))
+ testcases, err := p.q.ListTestcasesByGameID(ctx, int32(payload.GameID()))
if err != nil {
+ // TODO: send result
return fmt.Errorf("ListTestcasesByGameID failed: %v", err)
}
@@ -148,60 +192,65 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
}
reqData := testrunRequestData{
MaxDuration: 5000,
- Code: payload.Code,
+ Code: payload.Code(),
Stdin: testcase.Stdin,
}
reqJson, err := json.Marshal(reqData)
if err != nil {
+ // TODO: send result
return fmt.Errorf("json.Marshal failed: %v", err)
}
res, err := http.Post("http://worker:80/api/testrun", "application/json", bytes.NewBuffer(reqJson))
if err != nil {
+ // TODO: send result
return fmt.Errorf("http.Post failed: %v", err)
}
resData := testrunResponseData{}
if err := json.NewDecoder(res.Body).Decode(&resData); err != nil {
+ // TODO: send result
return fmt.Errorf("json.Decode failed: %v", err)
}
if resData.Status != "success" {
err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{
- SubmissionID: submissionID,
+ SubmissionID: int32(payload.SubmissionID),
TestcaseID: testcase.TestcaseID,
Status: resData.Status,
Stdout: resData.Stdout,
Stderr: resData.Stderr,
})
if err != nil {
+ // TODO: send result
return fmt.Errorf("CreateTestcaseResult failed: %v", err)
}
- p.results <- TaskExecResult{
- Task: &payload,
- Status: resData.Status,
+ p.results <- &TaskResultRunTestcase{
+ TaskPayload: &payload,
+ Status: resData.Status,
}
return fmt.Errorf("testrun failed: %v", resData.Stderr)
}
if !isTestcaseResultCorrect(testcase.Stdout, resData.Stdout) {
err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{
- SubmissionID: submissionID,
+ SubmissionID: int32(payload.SubmissionID),
TestcaseID: testcase.TestcaseID,
Status: "wrong_answer",
Stdout: resData.Stdout,
Stderr: resData.Stderr,
})
if err != nil {
+ // TODO: send result
return fmt.Errorf("CreateTestcaseResult failed: %v", err)
}
- p.results <- TaskExecResult{
- Task: &payload,
- Status: "wrong_answer",
+ p.results <- &TaskResultRunTestcase{
+ TaskPayload: &payload,
+ Status: "wrong_answer",
}
return fmt.Errorf("testrun failed: %v", resData.Stdout)
}
}
- p.results <- TaskExecResult{
- Task: &payload,
- Status: "success",
+ p.results <- &TaskResultRunTestcase{
+ TaskPayload: &payload,
+ Status: "success",
}
return nil
}
diff --git a/backend/taskqueue/tasks.go b/backend/taskqueue/tasks.go
index 5c518f4..679b6a0 100644
--- a/backend/taskqueue/tasks.go
+++ b/backend/taskqueue/tasks.go
@@ -6,31 +6,185 @@ import (
"github.com/hibiken/asynq"
)
+type TaskType string
+
const (
- TaskTypeExec = "exec"
+ TaskTypeCreateSubmissionRecord TaskType = "create_submission_record"
+ TaskTypeCompileSwiftToWasm TaskType = "compile_swift_to_wasm"
+ TaskTypeCompileWasmToNativeExecutable TaskType = "compile_wasm_to_native_executable"
+ TaskTypeRunTestcase TaskType = "run_testcase"
)
-type TaskExecPlayload struct {
+type TaskPayloadBase struct {
GameID int
UserID int
Code string
}
-func NewExecTask(gameID, userID int, code string) (*asynq.Task, error) {
- payload, err := json.Marshal(TaskExecPlayload{
- GameID: gameID,
- UserID: userID,
- Code: code,
+type TaskPayloadCreateSubmissionRecord struct {
+ TaskPayloadBase
+ CodeSize int
+}
+
+func NewTaskCreateSubmissionRecord(
+ gameID int,
+ userID int,
+ code string,
+ codeSize int,
+) (*asynq.Task, error) {
+ payload, err := json.Marshal(TaskPayloadCreateSubmissionRecord{
+ TaskPayloadBase: TaskPayloadBase{
+ GameID: gameID,
+ UserID: userID,
+ Code: code,
+ },
+ CodeSize: codeSize,
+ })
+ if err != nil {
+ return nil, err
+ }
+ return asynq.NewTask(string(TaskTypeCreateSubmissionRecord), payload), nil
+}
+
+func (t *TaskPayloadCreateSubmissionRecord) GameID() int { return t.TaskPayloadBase.GameID }
+func (t *TaskPayloadCreateSubmissionRecord) UserID() int { return t.TaskPayloadBase.UserID }
+func (t *TaskPayloadCreateSubmissionRecord) Code() string { return t.TaskPayloadBase.Code }
+
+type TaskPayloadCompileSwiftToWasm struct {
+ TaskPayloadBase
+ SubmissionID int
+}
+
+func NewTaskCompileSwiftToWasm(
+ gameID int,
+ userID int,
+ code string,
+ submissionID int,
+) (*asynq.Task, error) {
+ payload, err := json.Marshal(TaskPayloadCompileSwiftToWasm{
+ TaskPayloadBase: TaskPayloadBase{
+ GameID: gameID,
+ UserID: userID,
+ Code: code,
+ },
+ SubmissionID: submissionID,
+ })
+ if err != nil {
+ return nil, err
+ }
+ return asynq.NewTask(string(TaskTypeCompileSwiftToWasm), payload), nil
+}
+
+func (t *TaskPayloadCompileSwiftToWasm) GameID() int { return t.TaskPayloadBase.GameID }
+func (t *TaskPayloadCompileSwiftToWasm) UserID() int { return t.TaskPayloadBase.UserID }
+func (t *TaskPayloadCompileSwiftToWasm) Code() string { return t.TaskPayloadBase.Code }
+
+type TaskPayloadCompileWasmToNativeExecutable struct {
+ TaskPayloadBase
+ SubmissionID int
+}
+
+func NewTaskCompileWasmToNativeExecutable(
+ gameID int,
+ userID int,
+ code string,
+ submissionID int,
+) (*asynq.Task, error) {
+ payload, err := json.Marshal(TaskPayloadCompileWasmToNativeExecutable{
+ TaskPayloadBase: TaskPayloadBase{
+ GameID: gameID,
+ UserID: userID,
+ Code: code,
+ },
+ SubmissionID: submissionID,
+ })
+ if err != nil {
+ return nil, err
+ }
+ return asynq.NewTask(string(TaskTypeCompileWasmToNativeExecutable), payload), nil
+}
+
+func (t *TaskPayloadCompileWasmToNativeExecutable) GameID() int { return t.TaskPayloadBase.GameID }
+func (t *TaskPayloadCompileWasmToNativeExecutable) UserID() int { return t.TaskPayloadBase.UserID }
+func (t *TaskPayloadCompileWasmToNativeExecutable) Code() string { return t.TaskPayloadBase.Code }
+
+type TaskPayloadRunTestcase struct {
+ TaskPayloadBase
+ SubmissionID int
+ TestcaseID int
+}
+
+func NewTaskRunTestcase(
+ gameID int,
+ userID int,
+ code string,
+ submissionID int,
+ testcaseID int,
+) (*asynq.Task, error) {
+ payload, err := json.Marshal(TaskPayloadRunTestcase{
+ TaskPayloadBase: TaskPayloadBase{
+ GameID: gameID,
+ UserID: userID,
+ Code: code,
+ },
+ SubmissionID: submissionID,
+ TestcaseID: testcaseID,
})
if err != nil {
return nil, err
}
- return asynq.NewTask(TaskTypeExec, payload), nil
+ return asynq.NewTask(string(TaskTypeRunTestcase), payload), nil
+}
+
+func (t *TaskPayloadRunTestcase) GameID() int { return t.TaskPayloadBase.GameID }
+func (t *TaskPayloadRunTestcase) UserID() int { return t.TaskPayloadBase.UserID }
+func (t *TaskPayloadRunTestcase) Code() string { return t.TaskPayloadBase.Code }
+
+type TaskResult interface {
+ Type() TaskType
+ GameID() int
}
-type TaskExecResult struct {
- Task *TaskExecPlayload
- Status string
- Stdout string
- Stderr string
+type TaskResultCreateSubmissionRecord struct {
+ TaskPayload *TaskPayloadCreateSubmissionRecord
+ SubmissionID int
+ Err error
}
+
+func (r *TaskResultCreateSubmissionRecord) Type() TaskType { return TaskTypeCreateSubmissionRecord }
+func (r *TaskResultCreateSubmissionRecord) GameID() int { return r.TaskPayload.GameID() }
+
+type TaskResultCompileSwiftToWasm struct {
+ TaskPayload *TaskPayloadCompileSwiftToWasm
+ Status string
+ Stdout string
+ Stderr string
+ Err error
+}
+
+func (r *TaskResultCompileSwiftToWasm) Type() TaskType { return TaskTypeCompileSwiftToWasm }
+func (r *TaskResultCompileSwiftToWasm) GameID() int { return r.TaskPayload.GameID() }
+
+type TaskResultCompileWasmToNativeExecutable struct {
+ TaskPayload *TaskPayloadCompileWasmToNativeExecutable
+ Status string
+ Stdout string
+ Stderr string
+ Err error
+}
+
+func (r *TaskResultCompileWasmToNativeExecutable) Type() TaskType {
+ return TaskTypeCompileWasmToNativeExecutable
+}
+func (r *TaskResultCompileWasmToNativeExecutable) GameID() int { return r.TaskPayload.GameID() }
+
+type TaskResultRunTestcase struct {
+ TaskPayload *TaskPayloadRunTestcase
+ Status string
+ Stdout string
+ Stderr string
+ Err error
+}
+
+func (r *TaskResultRunTestcase) Type() TaskType { return TaskTypeRunTestcase }
+func (r *TaskResultRunTestcase) GameID() int { return r.TaskPayload.GameID() }
diff --git a/backend/taskqueue/worker_server.go b/backend/taskqueue/worker_server.go
index 67803e2..6dc65d8 100644
--- a/backend/taskqueue/worker_server.go
+++ b/backend/taskqueue/worker_server.go
@@ -7,31 +7,35 @@ import (
)
type WorkerServer struct {
- server *asynq.Server
- queries *db.Queries
- results chan TaskExecResult
+ server *asynq.Server
+ processor *processor
}
func NewWorkerServer(redisAddr string, queries *db.Queries) *WorkerServer {
+ server := asynq.NewServer(
+ asynq.RedisClientOpt{
+ Addr: redisAddr,
+ },
+ asynq.Config{},
+ )
+ processor := newProcessor(queries)
return &WorkerServer{
- server: asynq.NewServer(
- asynq.RedisClientOpt{
- Addr: redisAddr,
- },
- asynq.Config{},
- ),
- queries: queries,
- results: make(chan TaskExecResult),
+ server: server,
+ processor: processor,
}
}
func (s *WorkerServer) Run() error {
mux := asynq.NewServeMux()
- mux.Handle(TaskTypeExec, NewExecProcessor(s.queries, s.results))
+
+ mux.HandleFunc(string(TaskTypeCreateSubmissionRecord), s.processor.processTaskCreateSubmissionRecord)
+ mux.HandleFunc(string(TaskTypeCompileSwiftToWasm), s.processor.processTaskCompileSwiftToWasm)
+ mux.HandleFunc(string(TaskTypeCompileWasmToNativeExecutable), s.processor.processTaskCompileWasmToNativeExecutable)
+ mux.HandleFunc(string(TaskTypeRunTestcase), s.processor.processTaskRunTestcase)
return s.server.Run(mux)
}
-func (s *WorkerServer) Results() chan TaskExecResult {
- return s.results
+func (s *WorkerServer) Results() chan TaskResult {
+ return s.processor.results
}