Split NVIDIA tasks by homogeneous GPU groups

This commit is contained in:
Mikhail Chusavitin
2026-04-06 11:58:13 +03:00
parent fc5c100a29
commit 981315e6fd
5 changed files with 569 additions and 75 deletions

View File

@@ -11,6 +11,7 @@ import (
"os/exec"
"path/filepath"
"regexp"
"sort"
"strings"
"sync/atomic"
"syscall"
@@ -21,6 +22,12 @@ import (
)
var ansiEscapeRE = regexp.MustCompile(`\x1b\[[0-9;]*[a-zA-Z]|\x1b[()][A-Z0-9]|\x1b[DABC]`)
var apiListNvidiaGPUs = func(a *app.App) ([]platform.NvidiaGPU, error) {
if a == nil {
return nil, fmt.Errorf("app not configured")
}
return a.ListNvidiaGPUs()
}
// ── Job ID counter ────────────────────────────────────────────────────────────
@@ -30,6 +37,206 @@ func newJobID(prefix string) string {
return fmt.Sprintf("%s-%d", prefix, jobCounter.Add(1))
}
type taskRunResponse struct {
TaskID string `json:"task_id,omitempty"`
JobID string `json:"job_id,omitempty"`
TaskIDs []string `json:"task_ids,omitempty"`
JobIDs []string `json:"job_ids,omitempty"`
TaskCount int `json:"task_count,omitempty"`
}
type nvidiaTaskSelection struct {
GPUIndices []int
Label string
}
func writeTaskRunResponse(w http.ResponseWriter, tasks []*Task) {
if len(tasks) == 0 {
writeJSON(w, taskRunResponse{})
return
}
ids := make([]string, 0, len(tasks))
for _, t := range tasks {
if t == nil || strings.TrimSpace(t.ID) == "" {
continue
}
ids = append(ids, t.ID)
}
resp := taskRunResponse{TaskCount: len(ids)}
if len(ids) > 0 {
resp.TaskID = ids[0]
resp.JobID = ids[0]
resp.TaskIDs = ids
resp.JobIDs = ids
}
writeJSON(w, resp)
}
func shouldSplitHomogeneousNvidiaTarget(target string) bool {
switch strings.TrimSpace(target) {
case "nvidia", "nvidia-targeted-stress", "nvidia-benchmark", "nvidia-compute",
"nvidia-targeted-power", "nvidia-pulse", "nvidia-interconnect",
"nvidia-bandwidth", "nvidia-stress":
return true
default:
return false
}
}
func expandHomogeneousNvidiaSelections(gpus []platform.NvidiaGPU, include, exclude []int) ([]nvidiaTaskSelection, error) {
if len(gpus) == 0 {
return nil, fmt.Errorf("no NVIDIA GPUs detected")
}
indexed := make(map[int]platform.NvidiaGPU, len(gpus))
allIndices := make([]int, 0, len(gpus))
for _, gpu := range gpus {
indexed[gpu.Index] = gpu
allIndices = append(allIndices, gpu.Index)
}
sort.Ints(allIndices)
selected := allIndices
if len(include) > 0 {
selected = make([]int, 0, len(include))
seen := make(map[int]struct{}, len(include))
for _, idx := range include {
if _, ok := indexed[idx]; !ok {
continue
}
if _, dup := seen[idx]; dup {
continue
}
seen[idx] = struct{}{}
selected = append(selected, idx)
}
sort.Ints(selected)
}
if len(exclude) > 0 {
skip := make(map[int]struct{}, len(exclude))
for _, idx := range exclude {
skip[idx] = struct{}{}
}
filtered := selected[:0]
for _, idx := range selected {
if _, ok := skip[idx]; ok {
continue
}
filtered = append(filtered, idx)
}
selected = filtered
}
if len(selected) == 0 {
return nil, fmt.Errorf("no NVIDIA GPUs selected")
}
modelGroups := make(map[string][]platform.NvidiaGPU)
modelOrder := make([]string, 0)
for _, idx := range selected {
gpu := indexed[idx]
model := strings.TrimSpace(gpu.Name)
if model == "" {
model = fmt.Sprintf("GPU %d", gpu.Index)
}
if _, ok := modelGroups[model]; !ok {
modelOrder = append(modelOrder, model)
}
modelGroups[model] = append(modelGroups[model], gpu)
}
sort.Slice(modelOrder, func(i, j int) bool {
left := modelGroups[modelOrder[i]]
right := modelGroups[modelOrder[j]]
if len(left) == 0 || len(right) == 0 {
return modelOrder[i] < modelOrder[j]
}
return left[0].Index < right[0].Index
})
var groups []nvidiaTaskSelection
var singles []nvidiaTaskSelection
for _, model := range modelOrder {
group := modelGroups[model]
sort.Slice(group, func(i, j int) bool { return group[i].Index < group[j].Index })
indices := make([]int, 0, len(group))
for _, gpu := range group {
indices = append(indices, gpu.Index)
}
if len(indices) >= 2 {
groups = append(groups, nvidiaTaskSelection{
GPUIndices: indices,
Label: fmt.Sprintf("%s; GPUs %s", model, joinTaskIndices(indices)),
})
continue
}
gpu := group[0]
singles = append(singles, nvidiaTaskSelection{
GPUIndices: []int{gpu.Index},
Label: fmt.Sprintf("GPU %d — %s", gpu.Index, model),
})
}
return append(groups, singles...), nil
}
func joinTaskIndices(indices []int) string {
parts := make([]string, 0, len(indices))
for _, idx := range indices {
parts = append(parts, fmt.Sprintf("%d", idx))
}
return strings.Join(parts, ",")
}
func formatSplitTaskName(baseName, selectionLabel string) string {
baseName = strings.TrimSpace(baseName)
selectionLabel = strings.TrimSpace(selectionLabel)
if baseName == "" {
return selectionLabel
}
if selectionLabel == "" {
return baseName
}
return baseName + " (" + selectionLabel + ")"
}
func buildNvidiaTaskSet(target string, priority int, createdAt time.Time, params taskParams, baseName string, appRef *app.App, idPrefix string) ([]*Task, error) {
if !shouldSplitHomogeneousNvidiaTarget(target) {
t := &Task{
ID: newJobID(idPrefix),
Name: baseName,
Target: target,
Priority: priority,
Status: TaskPending,
CreatedAt: createdAt,
params: params,
}
return []*Task{t}, nil
}
gpus, err := apiListNvidiaGPUs(appRef)
if err != nil {
return nil, err
}
selections, err := expandHomogeneousNvidiaSelections(gpus, params.GPUIndices, params.ExcludeGPUIndices)
if err != nil {
return nil, err
}
tasks := make([]*Task, 0, len(selections))
for _, selection := range selections {
taskParamsCopy := params
taskParamsCopy.GPUIndices = append([]int(nil), selection.GPUIndices...)
taskParamsCopy.ExcludeGPUIndices = nil
displayName := formatSplitTaskName(baseName, selection.Label)
taskParamsCopy.DisplayName = displayName
tasks = append(tasks, &Task{
ID: newJobID(idPrefix),
Name: displayName,
Target: target,
Priority: priority,
Status: TaskPending,
CreatedAt: createdAt,
params: taskParamsCopy,
})
}
return tasks, nil
}
// ── SSE helpers ───────────────────────────────────────────────────────────────
func sseWrite(w http.ResponseWriter, event, data string) bool {
@@ -207,28 +414,28 @@ func (h *handler) handleAPISATRun(target string) http.HandlerFunc {
}
name := taskDisplayName(target, body.Profile, body.Loader)
t := &Task{
ID: newJobID("sat-" + target),
Name: name,
Target: target,
Status: TaskPending,
CreatedAt: time.Now(),
params: taskParams{
Duration: body.Duration,
DiagLevel: body.DiagLevel,
GPUIndices: body.GPUIndices,
ExcludeGPUIndices: body.ExcludeGPUIndices,
Loader: body.Loader,
BurnProfile: body.Profile,
DisplayName: body.DisplayName,
PlatformComponents: body.PlatformComponents,
},
}
if strings.TrimSpace(body.DisplayName) != "" {
t.Name = body.DisplayName
name = body.DisplayName
}
globalQueue.enqueue(t)
writeJSON(w, map[string]string{"task_id": t.ID, "job_id": t.ID})
params := taskParams{
Duration: body.Duration,
DiagLevel: body.DiagLevel,
GPUIndices: body.GPUIndices,
ExcludeGPUIndices: body.ExcludeGPUIndices,
Loader: body.Loader,
BurnProfile: body.Profile,
DisplayName: body.DisplayName,
PlatformComponents: body.PlatformComponents,
}
tasks, err := buildNvidiaTaskSet(target, 0, time.Now(), params, name, h.opts.App, "sat-"+target)
if err != nil {
writeError(w, http.StatusBadRequest, err.Error())
return
}
for _, t := range tasks {
globalQueue.enqueue(t)
}
writeTaskRunResponse(w, tasks)
}
}
@@ -257,27 +464,26 @@ func (h *handler) handleAPIBenchmarkNvidiaRun(w http.ResponseWriter, r *http.Req
if body.RunNCCL != nil {
runNCCL = *body.RunNCCL
}
t := &Task{
ID: newJobID("benchmark-nvidia"),
Name: taskDisplayName("nvidia-benchmark", "", ""),
Target: "nvidia-benchmark",
Priority: 15,
Status: TaskPending,
CreatedAt: time.Now(),
params: taskParams{
GPUIndices: body.GPUIndices,
ExcludeGPUIndices: body.ExcludeGPUIndices,
SizeMB: body.SizeMB,
BenchmarkProfile: body.Profile,
RunNCCL: runNCCL,
DisplayName: body.DisplayName,
},
}
name := taskDisplayName("nvidia-benchmark", "", "")
if strings.TrimSpace(body.DisplayName) != "" {
t.Name = body.DisplayName
name = body.DisplayName
}
globalQueue.enqueue(t)
writeJSON(w, map[string]string{"task_id": t.ID, "job_id": t.ID})
tasks, err := buildNvidiaTaskSet("nvidia-benchmark", 15, time.Now(), taskParams{
GPUIndices: body.GPUIndices,
ExcludeGPUIndices: body.ExcludeGPUIndices,
SizeMB: body.SizeMB,
BenchmarkProfile: body.Profile,
RunNCCL: runNCCL,
DisplayName: body.DisplayName,
}, name, h.opts.App, "benchmark-nvidia")
if err != nil {
writeError(w, http.StatusBadRequest, err.Error())
return
}
for _, t := range tasks {
globalQueue.enqueue(t)
}
writeTaskRunResponse(w, tasks)
}
func (h *handler) handleAPISATStream(w http.ResponseWriter, r *http.Request) {