fix(stress): keep platform burn responsive under load

This commit is contained in:
2026-03-31 22:28:26 +03:00
parent ea660500c9
commit c9ee078622
3 changed files with 141 additions and 41 deletions

View File

@@ -10,9 +10,11 @@ import (
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"time"
)
@@ -374,10 +376,17 @@ func buildCPUStressCmd(ctx context.Context) (*exec.Cmd, error) {
return nil, fmt.Errorf("stressapptest not found: %w", err)
}
// Use a very long duration; the context timeout will kill it at the right time.
cmd := exec.CommandContext(ctx, path, "-s", "86400", "-W", "--cc_test")
cmdArgs := []string{"-s", "86400", "-W", "--cc_test"}
if threads := platformStressCPUThreads(); threads > 0 {
cmdArgs = append(cmdArgs, "-m", strconv.Itoa(threads))
}
if mb := platformStressMemoryMB(); mb > 0 {
cmdArgs = append(cmdArgs, "-M", strconv.Itoa(mb))
}
cmd := exec.CommandContext(ctx, path, cmdArgs...)
cmd.Stdout = nil
cmd.Stderr = nil
if err := cmd.Start(); err != nil {
if err := startLowPriorityCmd(cmd, 15); err != nil {
return nil, fmt.Errorf("stressapptest start: %w", err)
}
return cmd, nil
@@ -418,7 +427,7 @@ func buildAMDGPUStressCmd(ctx context.Context) *exec.Cmd {
cmd := exec.CommandContext(ctx, rvsPath, "-c", cfgFile)
cmd.Stdout = nil
cmd.Stderr = nil
_ = cmd.Start()
_ = startLowPriorityCmd(cmd, 10)
return cmd
}
@@ -433,10 +442,50 @@ func buildNvidiaGPUStressCmd(ctx context.Context) *exec.Cmd {
cmd := exec.CommandContext(ctx, path, "--seconds", "86400", "--size-mb", "64")
cmd.Stdout = nil
cmd.Stderr = nil
_ = cmd.Start()
_ = startLowPriorityCmd(cmd, 10)
return cmd
}
func startLowPriorityCmd(cmd *exec.Cmd, nice int) error {
if err := cmd.Start(); err != nil {
return err
}
if cmd.Process != nil {
_ = syscall.Setpriority(syscall.PRIO_PROCESS, cmd.Process.Pid, nice)
}
return nil
}
func platformStressCPUThreads() int {
if n := envInt("BEE_PLATFORM_STRESS_THREADS", 0); n > 0 {
return n
}
cpus := runtime.NumCPU()
switch {
case cpus <= 2:
return 1
case cpus <= 8:
return cpus - 1
default:
return cpus - 2
}
}
func platformStressMemoryMB() int {
if mb := envInt("BEE_PLATFORM_STRESS_MB", 0); mb > 0 {
return mb
}
free := freeMemBytes()
if free <= 0 {
return 0
}
mb := int((free * 60) / 100 / (1024 * 1024))
if mb < 1024 {
return 1024
}
return mb
}
func packPlatformDir(dir, dest string) error {
f, err := os.Create(dest)
if err != nil {

View File

@@ -0,0 +1,34 @@
package platform
import (
"runtime"
"testing"
)
func TestPlatformStressCPUThreadsOverride(t *testing.T) {
t.Setenv("BEE_PLATFORM_STRESS_THREADS", "7")
if got := platformStressCPUThreads(); got != 7 {
t.Fatalf("platformStressCPUThreads=%d want 7", got)
}
}
func TestPlatformStressCPUThreadsDefaultLeavesHeadroom(t *testing.T) {
t.Setenv("BEE_PLATFORM_STRESS_THREADS", "")
got := platformStressCPUThreads()
if got < 1 {
t.Fatalf("platformStressCPUThreads=%d want >= 1", got)
}
if got > runtime.NumCPU() {
t.Fatalf("platformStressCPUThreads=%d want <= NumCPU=%d", got, runtime.NumCPU())
}
if runtime.NumCPU() > 2 && got >= runtime.NumCPU() {
t.Fatalf("platformStressCPUThreads=%d want headroom below NumCPU=%d", got, runtime.NumCPU())
}
}
func TestPlatformStressMemoryMBOverride(t *testing.T) {
t.Setenv("BEE_PLATFORM_STRESS_MB", "8192")
if got := platformStressMemoryMB(); got != 8192 {
t.Fatalf("platformStressMemoryMB=%d want 8192", got)
}
}

View File

@@ -36,6 +36,7 @@ typedef void *CUstream;
#define MAX_CUBLAS_PROFILES 5
#define MIN_PROFILE_BUDGET_BYTES ((size_t)4u * 1024u * 1024u)
#define MIN_STREAM_BUDGET_BYTES ((size_t)64u * 1024u * 1024u)
#define STRESS_LAUNCH_DEPTH 8
static const char *ptx_source =
".version 6.0\n"
@@ -422,24 +423,31 @@ static int run_ptx_fallback(struct cuda_api *api,
double deadline = start + (double)seconds;
while (now_seconds() < deadline) {
launches_per_wave = 0;
for (int lane = 0; lane < stream_count; lane++) {
unsigned int blocks = (unsigned int)((words[lane] + threads - 1) / threads);
if (!check_rc(api,
"cuLaunchKernel",
api->cuLaunchKernel(kernel,
blocks,
1,
1,
threads,
1,
1,
0,
streams[lane],
params[lane],
NULL))) {
goto fail;
for (int depth = 0; depth < STRESS_LAUNCH_DEPTH && now_seconds() < deadline; depth++) {
int launched_this_batch = 0;
for (int lane = 0; lane < stream_count; lane++) {
unsigned int blocks = (unsigned int)((words[lane] + threads - 1) / threads);
if (!check_rc(api,
"cuLaunchKernel",
api->cuLaunchKernel(kernel,
blocks,
1,
1,
threads,
1,
1,
0,
streams[lane],
params[lane],
NULL))) {
goto fail;
}
launches_per_wave++;
launched_this_batch++;
}
if (launched_this_batch <= 0) {
break;
}
launches_per_wave++;
}
if (launches_per_wave <= 0) {
goto fail;
@@ -460,10 +468,11 @@ static int run_ptx_fallback(struct cuda_api *api,
report->iterations = iterations;
snprintf(report->details,
sizeof(report->details),
"fallback_int32=OK requested_mb=%d actual_mb=%d streams=%d per_stream_mb=%zu iterations=%lu\n",
"fallback_int32=OK requested_mb=%d actual_mb=%d streams=%d queue_depth=%d per_stream_mb=%zu iterations=%lu\n",
size_mb,
report->buffer_mb,
report->stream_count,
STRESS_LAUNCH_DEPTH,
bytes_per_stream[0] / (1024u * 1024u),
iterations);
@@ -1184,10 +1193,11 @@ static int run_cublaslt_stress(struct cuda_api *cuda,
report->buffer_mb = (int)(total_budget / (1024u * 1024u));
append_detail(report->details,
sizeof(report->details),
"requested_mb=%d actual_mb=%d streams=%d mp_count=%d per_worker_mb=%zu\n",
"requested_mb=%d actual_mb=%d streams=%d queue_depth=%d mp_count=%d per_worker_mb=%zu\n",
size_mb,
report->buffer_mb,
report->stream_count,
STRESS_LAUNCH_DEPTH,
mp_count,
per_profile_budget / (1024u * 1024u));
@@ -1239,26 +1249,33 @@ static int run_cublaslt_stress(struct cuda_api *cuda,
double deadline = now_seconds() + (double)seconds;
while (now_seconds() < deadline) {
wave_launches = 0;
for (int i = 0; i < prepared_count; i++) {
if (!prepared[i].ready) {
continue;
}
if (!run_cublas_profile(handle, &cublas, &prepared[i])) {
append_detail(report->details,
sizeof(report->details),
"%s=FAILED runtime\n",
prepared[i].desc.name);
for (int j = 0; j < prepared_count; j++) {
destroy_profile(&cublas, cuda, &prepared[j]);
for (int depth = 0; depth < STRESS_LAUNCH_DEPTH && now_seconds() < deadline; depth++) {
int launched_this_batch = 0;
for (int i = 0; i < prepared_count; i++) {
if (!prepared[i].ready) {
continue;
}
cublas.cublasLtDestroy(handle);
destroy_streams(cuda, streams, stream_count);
cuda->cuCtxDestroy(ctx);
return 0;
if (!run_cublas_profile(handle, &cublas, &prepared[i])) {
append_detail(report->details,
sizeof(report->details),
"%s=FAILED runtime\n",
prepared[i].desc.name);
for (int j = 0; j < prepared_count; j++) {
destroy_profile(&cublas, cuda, &prepared[j]);
}
cublas.cublasLtDestroy(handle);
destroy_streams(cuda, streams, stream_count);
cuda->cuCtxDestroy(ctx);
return 0;
}
prepared[i].iterations++;
report->iterations++;
wave_launches++;
launched_this_batch++;
}
if (launched_this_batch <= 0) {
break;
}
prepared[i].iterations++;
report->iterations++;
wave_launches++;
}
if (wave_launches <= 0) {
break;