aboutsummaryrefslogtreecommitdiffhomepage
path: root/backend/taskqueue
diff options
context:
space:
mode:
Diffstat (limited to 'backend/taskqueue')
-rw-r--r--backend/taskqueue/processor.go35
-rw-r--r--backend/taskqueue/tasks.go7
-rw-r--r--backend/taskqueue/worker_server.go12
3 files changed, 40 insertions, 14 deletions
diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go
index 1d4c412..ca180e9 100644
--- a/backend/taskqueue/processor.go
+++ b/backend/taskqueue/processor.go
@@ -14,14 +14,14 @@ import (
)
type ExecProcessor struct {
- q *db.Queries
- c chan string
+ q *db.Queries
+ results chan TaskExecResult
}
-func NewExecProcessor(q *db.Queries, c chan string) *ExecProcessor {
+func NewExecProcessor(q *db.Queries, results chan TaskExecResult) *ExecProcessor {
return &ExecProcessor{
- q: q,
- c: c,
+ q: q,
+ results: results,
}
}
@@ -80,7 +80,10 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
if err != nil {
return fmt.Errorf("CreateTestcaseExecution failed: %v", err)
}
- p.c <- "compile_error"
+ p.results <- TaskExecResult{
+ Task: &payload,
+ Result: "compile_error",
+ }
return fmt.Errorf("swiftc failed: %v", resData.Stderr)
}
}
@@ -121,7 +124,10 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
if err != nil {
return fmt.Errorf("CreateTestcaseExecution failed: %v", err)
}
- p.c <- "compile_error"
+ p.results <- TaskExecResult{
+ Task: &payload,
+ Result: "compile_error",
+ }
return fmt.Errorf("wasmc failed: %v", resData.Stderr)
}
}
@@ -170,7 +176,10 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
if err != nil {
return fmt.Errorf("CreateTestcaseExecution failed: %v", err)
}
- p.c <- resData.Result
+ p.results <- TaskExecResult{
+ Task: &payload,
+ Result: resData.Result,
+ }
return fmt.Errorf("testrun failed: %v", resData.Stderr)
}
if !isTestcaseExecutionCorrect(testcase.Stdout, resData.Stdout) {
@@ -184,12 +193,18 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
if err != nil {
return fmt.Errorf("CreateTestcaseExecution failed: %v", err)
}
- p.c <- "wrong_answer"
+ p.results <- TaskExecResult{
+ Task: &payload,
+ Result: "wrong_answer",
+ }
return fmt.Errorf("testrun failed: %v", resData.Stdout)
}
}
- p.c <- "success"
+ p.results <- TaskExecResult{
+ Task: &payload,
+ Result: "success",
+ }
return nil
}
diff --git a/backend/taskqueue/tasks.go b/backend/taskqueue/tasks.go
index cd67948..db05553 100644
--- a/backend/taskqueue/tasks.go
+++ b/backend/taskqueue/tasks.go
@@ -27,3 +27,10 @@ func NewExecTask(gameID, userID int, code string) (*asynq.Task, error) {
}
return asynq.NewTask(TaskTypeExec, payload), nil
}
+
+type TaskExecResult struct {
+ Task *TaskExecPlayload
+ Result string
+ Stdout string
+ Stderr string
+}
diff --git a/backend/taskqueue/worker_server.go b/backend/taskqueue/worker_server.go
index 485d6d3..67803e2 100644
--- a/backend/taskqueue/worker_server.go
+++ b/backend/taskqueue/worker_server.go
@@ -9,10 +9,10 @@ import (
type WorkerServer struct {
server *asynq.Server
queries *db.Queries
- c chan string
+ results chan TaskExecResult
}
-func NewWorkerServer(redisAddr string, queries *db.Queries, c chan string) *WorkerServer {
+func NewWorkerServer(redisAddr string, queries *db.Queries) *WorkerServer {
return &WorkerServer{
server: asynq.NewServer(
asynq.RedisClientOpt{
@@ -21,13 +21,17 @@ func NewWorkerServer(redisAddr string, queries *db.Queries, c chan string) *Work
asynq.Config{},
),
queries: queries,
- c: c,
+ results: make(chan TaskExecResult),
}
}
func (s *WorkerServer) Run() error {
mux := asynq.NewServeMux()
- mux.Handle(TaskTypeExec, NewExecProcessor(s.queries, s.c))
+ mux.Handle(TaskTypeExec, NewExecProcessor(s.queries, s.results))
return s.server.Run(mux)
}
+
+func (s *WorkerServer) Results() chan TaskExecResult {
+ return s.results
+}