aboutsummaryrefslogtreecommitdiffhomepage
path: root/backend/taskqueue
diff options
context:
space:
mode:
authornsfisis <nsfisis@gmail.com>2025-03-04 22:55:01 +0900
committernsfisis <nsfisis@gmail.com>2025-03-08 10:12:44 +0900
commit1e6df136d8202c8adf65948527f4c3e7583b338c (patch)
tree7c82476f6bbbc71d72ab7e71e39559eca197fd95 /backend/taskqueue
parent54316868c3bec1ff9b04643dfe6c13cf56bf3246 (diff)
downloadphperkaigi-2025-albatross-1e6df136d8202c8adf65948527f4c3e7583b338c.tar.gz
phperkaigi-2025-albatross-1e6df136d8202c8adf65948527f4c3e7583b338c.tar.zst
phperkaigi-2025-albatross-1e6df136d8202c8adf65948527f4c3e7583b338c.zip
websocket to polling
Diffstat (limited to 'backend/taskqueue')
-rw-r--r--backend/taskqueue/processor.go165
-rw-r--r--backend/taskqueue/processor_wrapper.go66
-rw-r--r--backend/taskqueue/queue.go65
-rw-r--r--backend/taskqueue/tasks.go162
-rw-r--r--backend/taskqueue/worker_server.go9
5 files changed, 28 insertions, 439 deletions
diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go
index 222c586..0dfaf68 100644
--- a/backend/taskqueue/processor.go
+++ b/backend/taskqueue/processor.go
@@ -6,180 +6,45 @@ import (
"encoding/json"
"fmt"
"net/http"
-
- "github.com/nsfisis/phperkaigi-2025-albatross/backend/auth"
- "github.com/nsfisis/phperkaigi-2025-albatross/backend/db"
)
-type processor struct {
- q *db.Queries
-}
+type processor struct{}
-func newProcessor(q *db.Queries) processor {
- return processor{
- q: q,
- }
+func newProcessor() processor {
+ return processor{}
}
-func (p *processor) doProcessTaskCreateSubmissionRecord(
- ctx context.Context,
- payload *TaskPayloadCreateSubmissionRecord,
-) (*TaskResultCreateSubmissionRecord, error) {
- // TODO: upsert
- submissionID, err := p.q.CreateSubmission(ctx, db.CreateSubmissionParams{
- GameID: int32(payload.GameID()),
- UserID: int32(payload.UserID()),
- Code: payload.Code,
- CodeSize: int32(payload.CodeSize),
- CodeHash: string(payload.CodeHash()),
- })
- if err != nil {
- return nil, err
- }
-
- return &TaskResultCreateSubmissionRecord{
- TaskPayload: payload,
- SubmissionID: int(submissionID),
- }, nil
+type testrunRequestData struct {
+ Code string `json:"code"`
+ Stdin string `json:"stdin"`
+ MaxDuration int `json:"max_duration_ms"`
}
-func (p *processor) doProcessTaskCompileSwiftToWasm(
- _ context.Context,
- payload *TaskPayloadCompileSwiftToWasm,
-) (*TaskResultCompileSwiftToWasm, error) {
- type swiftcRequestData struct {
- MaxDuration int `json:"max_duration_ms"`
- Code string `json:"code"`
- CodeHash string `json:"code_hash"`
- }
- type swiftcResponseData struct {
- Status string `json:"status"`
- Stdout string `json:"stdout"`
- Stderr string `json:"stderr"`
- }
- reqData := swiftcRequestData{
- MaxDuration: 10000,
- Code: payload.Code,
- CodeHash: string(payload.CodeHash()),
- }
- reqJSON, err := json.Marshal(reqData)
- if err != nil {
- return nil, fmt.Errorf("json.Marshal failed: %v", err)
- }
- req, err := http.NewRequest("POST", "http://worker:80/api/swiftc", bytes.NewBuffer(reqJSON))
- if err != nil {
- return nil, fmt.Errorf("http.NewRequest failed: %v", err)
- }
- req.Header.Set("Content-Type", "application/json")
- jwt, err := auth.NewAnonymousJWT()
- if err != nil {
- return nil, fmt.Errorf("auth.NewAnonymousJWT failed: %v", err)
- }
- req.Header.Set("Authorization", "Bearer "+jwt)
-
- client := &http.Client{}
- res, err := client.Do(req)
- if err != nil {
- return nil, fmt.Errorf("client.Do failed: %v", err)
- }
- defer res.Body.Close()
-
- resData := swiftcResponseData{}
- if err := json.NewDecoder(res.Body).Decode(&resData); err != nil {
- return nil, fmt.Errorf("json.Decode failed: %v", err)
- }
- return &TaskResultCompileSwiftToWasm{
- TaskPayload: payload,
- Status: resData.Status,
- Stdout: resData.Stdout,
- Stderr: resData.Stderr,
- }, nil
-}
-
-func (p *processor) doProcessTaskCompileWasmToNativeExecutable(
- _ context.Context,
- payload *TaskPayloadCompileWasmToNativeExecutable,
-) (*TaskResultCompileWasmToNativeExecutable, error) {
- type wasmcRequestData struct {
- MaxDuration int `json:"max_duration_ms"`
- CodeHash string `json:"code_hash"`
- }
- type wasmcResponseData struct {
- Status string `json:"status"`
- Stdout string `json:"stdout"`
- Stderr string `json:"stderr"`
- }
- reqData := wasmcRequestData{
- MaxDuration: 10000,
- CodeHash: string(payload.CodeHash()),
- }
- reqJSON, err := json.Marshal(reqData)
- if err != nil {
- return nil, fmt.Errorf("json.Marshal failed: %v", err)
- }
- req, err := http.NewRequest("POST", "http://worker:80/api/wasmc", bytes.NewBuffer(reqJSON))
- if err != nil {
- return nil, fmt.Errorf("http.NewRequest failed: %v", err)
- }
- req.Header.Set("Content-Type", "application/json")
- jwt, err := auth.NewAnonymousJWT()
- if err != nil {
- return nil, fmt.Errorf("auth.NewAnonymousJWT failed: %v", err)
- }
- req.Header.Set("Authorization", "Bearer "+jwt)
-
- client := &http.Client{}
- res, err := client.Do(req)
- if err != nil {
- return nil, fmt.Errorf("client.Do failed: %v", err)
- }
- defer res.Body.Close()
-
- resData := wasmcResponseData{}
- if err := json.NewDecoder(res.Body).Decode(&resData); err != nil {
- return nil, fmt.Errorf("json.Decode failed: %v", err)
- }
- return &TaskResultCompileWasmToNativeExecutable{
- TaskPayload: payload,
- Status: resData.Status,
- Stdout: resData.Stdout,
- Stderr: resData.Stderr,
- }, nil
+type testrunResponseData struct {
+ Status string `json:"status"`
+ Stdout string `json:"stdout"`
+ Stderr string `json:"stderr"`
}
func (p *processor) doProcessTaskRunTestcase(
_ context.Context,
payload *TaskPayloadRunTestcase,
) (*TaskResultRunTestcase, error) {
- type testrunRequestData struct {
- MaxDuration int `json:"max_duration_ms"`
- CodeHash string `json:"code_hash"`
- Stdin string `json:"stdin"`
- }
- type testrunResponseData struct {
- Status string `json:"status"`
- Stdout string `json:"stdout"`
- Stderr string `json:"stderr"`
- }
reqData := testrunRequestData{
- MaxDuration: 5000,
- CodeHash: string(payload.CodeHash()),
+ Code: payload.Code,
Stdin: payload.Stdin,
+ MaxDuration: 5000,
}
reqJSON, err := json.Marshal(reqData)
if err != nil {
return nil, fmt.Errorf("json.Marshal failed: %v", err)
}
- req, err := http.NewRequest("POST", "http://worker:80/api/testrun", bytes.NewBuffer(reqJSON))
+ req, err := http.NewRequest("POST", "http://worker:80/exec", bytes.NewBuffer(reqJSON))
if err != nil {
return nil, fmt.Errorf("http.NewRequest failed: %v", err)
}
req.Header.Set("Content-Type", "application/json")
- jwt, err := auth.NewAnonymousJWT()
- if err != nil {
- return nil, fmt.Errorf("auth.NewAnonymousJWT failed: %v", err)
- }
- req.Header.Set("Authorization", "Bearer "+jwt)
+ req.Header.Set("Accept", "application/json")
client := &http.Client{}
res, err := client.Do(req)
diff --git a/backend/taskqueue/processor_wrapper.go b/backend/taskqueue/processor_wrapper.go
index b1fbd16..e6ddef3 100644
--- a/backend/taskqueue/processor_wrapper.go
+++ b/backend/taskqueue/processor_wrapper.go
@@ -23,72 +23,6 @@ func newProcessorWrapper(impl processor) *processorWrapper {
}
}
-func (p *processorWrapper) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq.Task) error {
- var payload TaskPayloadCompileSwiftToWasm
- if err := json.Unmarshal(t.Payload(), &payload); err != nil {
- err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
- p.results <- &TaskResultCompileSwiftToWasm{Err: err}
- return err
- }
-
- result, err := p.impl.doProcessTaskCompileSwiftToWasm(ctx, &payload)
- if err != nil {
- retryCount, _ := asynq.GetRetryCount(ctx)
- maxRetry, _ := asynq.GetMaxRetry(ctx)
- isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry
- if !isRecoverable {
- p.results <- &TaskResultCompileSwiftToWasm{Err: err}
- }
- return err
- }
- p.results <- result
- return nil
-}
-
-func (p *processorWrapper) processTaskCompileWasmToNativeExecutable(ctx context.Context, t *asynq.Task) error {
- var payload TaskPayloadCompileWasmToNativeExecutable
- if err := json.Unmarshal(t.Payload(), &payload); err != nil {
- err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
- p.results <- &TaskResultCompileWasmToNativeExecutable{Err: err}
- return err
- }
-
- result, err := p.impl.doProcessTaskCompileWasmToNativeExecutable(ctx, &payload)
- if err != nil {
- retryCount, _ := asynq.GetRetryCount(ctx)
- maxRetry, _ := asynq.GetMaxRetry(ctx)
- isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry
- if !isRecoverable {
- p.results <- &TaskResultCompileWasmToNativeExecutable{Err: err}
- }
- return err
- }
- p.results <- result
- return nil
-}
-
-func (p *processorWrapper) processTaskCreateSubmissionRecord(ctx context.Context, t *asynq.Task) error {
- var payload TaskPayloadCreateSubmissionRecord
- if err := json.Unmarshal(t.Payload(), &payload); err != nil {
- err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
- p.results <- &TaskResultCreateSubmissionRecord{Err: err}
- return err
- }
-
- result, err := p.impl.doProcessTaskCreateSubmissionRecord(ctx, &payload)
- if err != nil {
- retryCount, _ := asynq.GetRetryCount(ctx)
- maxRetry, _ := asynq.GetMaxRetry(ctx)
- isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry
- if !isRecoverable {
- p.results <- &TaskResultCreateSubmissionRecord{Err: err}
- }
- return err
- }
- p.results <- result
- return nil
-}
-
func (p *processorWrapper) processTaskRunTestcase(ctx context.Context, t *asynq.Task) error {
var payload TaskPayloadRunTestcase
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
diff --git a/backend/taskqueue/queue.go b/backend/taskqueue/queue.go
index 30fe265..b348fca 100644
--- a/backend/taskqueue/queue.go
+++ b/backend/taskqueue/queue.go
@@ -20,82 +20,21 @@ func (q *Queue) Close() {
q.client.Close()
}
-func (q *Queue) EnqueueTaskCreateSubmissionRecord(
- gameID int,
- userID int,
- code string,
- codeSize int,
- codeHash MD5HexHash,
-) error {
- task, err := newTaskCreateSubmissionRecord(
- gameID,
- userID,
- code,
- codeSize,
- codeHash,
- )
- if err != nil {
- return err
- }
- _, err = q.client.Enqueue(task)
- return err
-}
-
-func (q *Queue) EnqueueTaskCompileSwiftToWasm(
- gameID int,
- userID int,
- code string,
- codeHash MD5HexHash,
- submissionID int,
-) error {
- task, err := newTaskCompileSwiftToWasm(
- gameID,
- userID,
- code,
- codeHash,
- submissionID,
- )
- if err != nil {
- return err
- }
- _, err = q.client.Enqueue(task)
- return err
-}
-
-func (q *Queue) EnqueueTaskCompileWasmToNativeExecutable(
- gameID int,
- userID int,
- codeHash MD5HexHash,
- submissionID int,
-) error {
- task, err := newTaskCompileWasmToNativeExecutable(
- gameID,
- userID,
- codeHash,
- submissionID,
- )
- if err != nil {
- return err
- }
- _, err = q.client.Enqueue(task)
- return err
-}
-
func (q *Queue) EnqueueTaskRunTestcase(
gameID int,
userID int,
- codeHash MD5HexHash,
submissionID int,
testcaseID int,
+ code string,
stdin string,
stdout string,
) error {
task, err := newTaskRunTestcase(
gameID,
userID,
- codeHash,
submissionID,
testcaseID,
+ code,
stdin,
stdout,
)
diff --git a/backend/taskqueue/tasks.go b/backend/taskqueue/tasks.go
index d5f2993..e595d99 100644
--- a/backend/taskqueue/tasks.go
+++ b/backend/taskqueue/tasks.go
@@ -8,121 +8,16 @@ import (
type TaskType string
-// MD5 hash in hexadecimal format
-type MD5HexHash string
-
const (
- TaskTypeCreateSubmissionRecord TaskType = "create_submission_record"
- TaskTypeCompileSwiftToWasm TaskType = "compile_swift_to_wasm"
- TaskTypeCompileWasmToNativeExecutable TaskType = "compile_wasm_to_native_executable"
- TaskTypeRunTestcase TaskType = "run_testcase"
+ TaskTypeRunTestcase TaskType = "run_testcase"
)
-type TaskPayloadBase struct {
- GameID int
- UserID int
- CodeHash MD5HexHash
-}
-
-type TaskPayloadCreateSubmissionRecord struct {
- TaskPayloadBase
- Code string
- CodeSize int
-}
-
-func newTaskCreateSubmissionRecord(
- gameID int,
- userID int,
- code string,
- codeSize int,
- codeHash MD5HexHash,
-) (*asynq.Task, error) {
- payload, err := json.Marshal(TaskPayloadCreateSubmissionRecord{
- TaskPayloadBase: TaskPayloadBase{
- GameID: gameID,
- UserID: userID,
- CodeHash: codeHash,
- },
- Code: code,
- CodeSize: codeSize,
- })
- if err != nil {
- return nil, err
- }
- return asynq.NewTask(string(TaskTypeCreateSubmissionRecord), payload), nil
-}
-
-func (t *TaskPayloadCreateSubmissionRecord) GameID() int { return t.TaskPayloadBase.GameID }
-func (t *TaskPayloadCreateSubmissionRecord) UserID() int { return t.TaskPayloadBase.UserID }
-func (t *TaskPayloadCreateSubmissionRecord) CodeHash() MD5HexHash { return t.TaskPayloadBase.CodeHash }
-
-type TaskPayloadCompileSwiftToWasm struct {
- TaskPayloadBase
- SubmissionID int
- Code string
-}
-
-func newTaskCompileSwiftToWasm(
- gameID int,
- userID int,
- code string,
- codeHash MD5HexHash,
- submissionID int,
-) (*asynq.Task, error) {
- payload, err := json.Marshal(TaskPayloadCompileSwiftToWasm{
- TaskPayloadBase: TaskPayloadBase{
- GameID: gameID,
- UserID: userID,
- CodeHash: codeHash,
- },
- SubmissionID: submissionID,
- Code: code,
- })
- if err != nil {
- return nil, err
- }
- return asynq.NewTask(string(TaskTypeCompileSwiftToWasm), payload), nil
-}
-
-func (t *TaskPayloadCompileSwiftToWasm) GameID() int { return t.TaskPayloadBase.GameID }
-func (t *TaskPayloadCompileSwiftToWasm) UserID() int { return t.TaskPayloadBase.UserID }
-func (t *TaskPayloadCompileSwiftToWasm) CodeHash() MD5HexHash { return t.TaskPayloadBase.CodeHash }
-
-type TaskPayloadCompileWasmToNativeExecutable struct {
- TaskPayloadBase
- SubmissionID int
-}
-
-func newTaskCompileWasmToNativeExecutable(
- gameID int,
- userID int,
- codeHash MD5HexHash,
- submissionID int,
-) (*asynq.Task, error) {
- payload, err := json.Marshal(TaskPayloadCompileWasmToNativeExecutable{
- TaskPayloadBase: TaskPayloadBase{
- GameID: gameID,
- UserID: userID,
- CodeHash: codeHash,
- },
- SubmissionID: submissionID,
- })
- if err != nil {
- return nil, err
- }
- return asynq.NewTask(string(TaskTypeCompileWasmToNativeExecutable), payload), nil
-}
-
-func (t *TaskPayloadCompileWasmToNativeExecutable) GameID() int { return t.TaskPayloadBase.GameID }
-func (t *TaskPayloadCompileWasmToNativeExecutable) UserID() int { return t.TaskPayloadBase.UserID }
-func (t *TaskPayloadCompileWasmToNativeExecutable) CodeHash() MD5HexHash {
- return t.TaskPayloadBase.CodeHash
-}
-
type TaskPayloadRunTestcase struct {
- TaskPayloadBase
+ GameID int
+ UserID int
SubmissionID int
TestcaseID int
+ Code string
Stdin string
Stdout string
}
@@ -130,20 +25,18 @@ type TaskPayloadRunTestcase struct {
func newTaskRunTestcase(
gameID int,
userID int,
- codeHash MD5HexHash,
submissionID int,
testcaseID int,
+ code string,
stdin string,
stdout string,
) (*asynq.Task, error) {
payload, err := json.Marshal(TaskPayloadRunTestcase{
- TaskPayloadBase: TaskPayloadBase{
- GameID: gameID,
- UserID: userID,
- CodeHash: codeHash,
- },
+ GameID: gameID,
+ UserID: userID,
SubmissionID: submissionID,
TestcaseID: testcaseID,
+ Code: code,
Stdin: stdin,
Stdout: stdout,
})
@@ -153,48 +46,11 @@ func newTaskRunTestcase(
return asynq.NewTask(string(TaskTypeRunTestcase), payload), nil
}
-func (t *TaskPayloadRunTestcase) GameID() int { return t.TaskPayloadBase.GameID }
-func (t *TaskPayloadRunTestcase) UserID() int { return t.TaskPayloadBase.UserID }
-func (t *TaskPayloadRunTestcase) CodeHash() MD5HexHash { return t.TaskPayloadBase.CodeHash }
-
type TaskResult interface {
Type() TaskType
GameID() int
}
-type TaskResultCreateSubmissionRecord struct {
- TaskPayload *TaskPayloadCreateSubmissionRecord
- SubmissionID int
- Err error
-}
-
-func (r *TaskResultCreateSubmissionRecord) Type() TaskType { return TaskTypeCreateSubmissionRecord }
-func (r *TaskResultCreateSubmissionRecord) GameID() int { return r.TaskPayload.GameID() }
-
-type TaskResultCompileSwiftToWasm struct {
- TaskPayload *TaskPayloadCompileSwiftToWasm
- Status string
- Stdout string
- Stderr string
- Err error
-}
-
-func (r *TaskResultCompileSwiftToWasm) Type() TaskType { return TaskTypeCompileSwiftToWasm }
-func (r *TaskResultCompileSwiftToWasm) GameID() int { return r.TaskPayload.GameID() }
-
-type TaskResultCompileWasmToNativeExecutable struct {
- TaskPayload *TaskPayloadCompileWasmToNativeExecutable
- Status string
- Stdout string
- Stderr string
- Err error
-}
-
-func (r *TaskResultCompileWasmToNativeExecutable) Type() TaskType {
- return TaskTypeCompileWasmToNativeExecutable
-}
-func (r *TaskResultCompileWasmToNativeExecutable) GameID() int { return r.TaskPayload.GameID() }
-
type TaskResultRunTestcase struct {
TaskPayload *TaskPayloadRunTestcase
Status string
@@ -204,4 +60,4 @@ type TaskResultRunTestcase struct {
}
func (r *TaskResultRunTestcase) Type() TaskType { return TaskTypeRunTestcase }
-func (r *TaskResultRunTestcase) GameID() int { return r.TaskPayload.GameID() }
+func (r *TaskResultRunTestcase) GameID() int { return r.TaskPayload.GameID }
diff --git a/backend/taskqueue/worker_server.go b/backend/taskqueue/worker_server.go
index 51387d1..7effba7 100644
--- a/backend/taskqueue/worker_server.go
+++ b/backend/taskqueue/worker_server.go
@@ -2,8 +2,6 @@ package taskqueue
import (
"github.com/hibiken/asynq"
-
- "github.com/nsfisis/phperkaigi-2025-albatross/backend/db"
)
type WorkerServer struct {
@@ -11,14 +9,14 @@ type WorkerServer struct {
processor *processorWrapper
}
-func NewWorkerServer(redisAddr string, queries *db.Queries) *WorkerServer {
+func NewWorkerServer(redisAddr string) *WorkerServer {
server := asynq.NewServer(
asynq.RedisClientOpt{
Addr: redisAddr,
},
asynq.Config{},
)
- processor := newProcessorWrapper(newProcessor(queries))
+ processor := newProcessorWrapper(newProcessor())
return &WorkerServer{
server: server,
processor: processor,
@@ -28,9 +26,6 @@ func NewWorkerServer(redisAddr string, queries *db.Queries) *WorkerServer {
func (s *WorkerServer) Run() error {
mux := asynq.NewServeMux()
- mux.HandleFunc(string(TaskTypeCreateSubmissionRecord), s.processor.processTaskCreateSubmissionRecord)
- mux.HandleFunc(string(TaskTypeCompileSwiftToWasm), s.processor.processTaskCompileSwiftToWasm)
- mux.HandleFunc(string(TaskTypeCompileWasmToNativeExecutable), s.processor.processTaskCompileWasmToNativeExecutable)
mux.HandleFunc(string(TaskTypeRunTestcase), s.processor.processTaskRunTestcase)
return s.server.Run(mux)