Skip to content

vllm.benchmarks.serve_multi

SLAVariable module-attribute

SLAVariable = Literal['request_rate', 'max_concurrency']

SLA_CRITERIA module-attribute

_BAD_PARAMS_TYPE_MSG module-attribute

_BAD_PARAMS_TYPE_MSG = "The parameters to vary should be expressed as a JSON list of dictionaries."

SLACriterionBase

Bases: ABC

Source code in vllm/benchmarks/serve_multi.py
class SLACriterionBase(ABC):
    def __init__(self, target: float) -> None:
        super().__init__()

        self.target = target

    @abstractmethod
    def validate(self, actual: float) -> bool:
        """Return `True` if this criterion is met; otherwise `False`."""
        raise NotImplementedError

    @abstractmethod
    def format_cond(self, lhs: str) -> str:
        raise NotImplementedError

    def print_and_validate(
        self,
        metrics: dict[str, float],
        metrics_key: str,
    ) -> bool:
        metric = metrics[metrics_key]
        result = self.validate(metric)

        cond = self.format_cond(f"{metrics_key} = {metric:.2f}")
        print(f"Validating SLA: {cond} | " + ("PASSED" if result else "FAILED"))

        return result

target instance-attribute

target = target

__init__

__init__(target: float) -> None
Source code in vllm/benchmarks/serve_multi.py
def __init__(self, target: float) -> None:
    super().__init__()

    self.target = target

format_cond abstractmethod

format_cond(lhs: str) -> str
Source code in vllm/benchmarks/serve_multi.py
@abstractmethod
def format_cond(self, lhs: str) -> str:
    raise NotImplementedError

print_and_validate

print_and_validate(
    metrics: dict[str, float], metrics_key: str
) -> bool
Source code in vllm/benchmarks/serve_multi.py
def print_and_validate(
    self,
    metrics: dict[str, float],
    metrics_key: str,
) -> bool:
    metric = metrics[metrics_key]
    result = self.validate(metric)

    cond = self.format_cond(f"{metrics_key} = {metric:.2f}")
    print(f"Validating SLA: {cond} | " + ("PASSED" if result else "FAILED"))

    return result

validate abstractmethod

validate(actual: float) -> bool

Return True if this criterion is met; otherwise False.

Source code in vllm/benchmarks/serve_multi.py
@abstractmethod
def validate(self, actual: float) -> bool:
    """Return `True` if this criterion is met; otherwise `False`."""
    raise NotImplementedError

SLAGreaterThan

Bases: SLACriterionBase

Source code in vllm/benchmarks/serve_multi.py
class SLAGreaterThan(SLACriterionBase):
    @override
    def validate(self, actual: float) -> bool:
        return actual > self.target

    @override
    def format_cond(self, lhs: str) -> str:
        return f"{lhs}>{self.target:.2f}"

format_cond

format_cond(lhs: str) -> str
Source code in vllm/benchmarks/serve_multi.py
@override
def format_cond(self, lhs: str) -> str:
    return f"{lhs}>{self.target:.2f}"

validate

validate(actual: float) -> bool
Source code in vllm/benchmarks/serve_multi.py
@override
def validate(self, actual: float) -> bool:
    return actual > self.target

SLAGreaterThanOrEqual

Bases: SLACriterionBase

Source code in vllm/benchmarks/serve_multi.py
class SLAGreaterThanOrEqual(SLACriterionBase):
    @override
    def validate(self, actual: float) -> bool:
        return actual >= self.target

    @override
    def format_cond(self, lhs: str) -> str:
        return f"{lhs}>={self.target:.2f}"

format_cond

format_cond(lhs: str) -> str
Source code in vllm/benchmarks/serve_multi.py
@override
def format_cond(self, lhs: str) -> str:
    return f"{lhs}>={self.target:.2f}"

validate

validate(actual: float) -> bool
Source code in vllm/benchmarks/serve_multi.py
@override
def validate(self, actual: float) -> bool:
    return actual >= self.target

SLALessThan

Bases: SLACriterionBase

Source code in vllm/benchmarks/serve_multi.py
class SLALessThan(SLACriterionBase):
    @override
    def validate(self, actual: float) -> bool:
        return actual < self.target

    @override
    def format_cond(self, lhs: str) -> str:
        return f"{lhs}<{self.target:.2f}"

format_cond

format_cond(lhs: str) -> str
Source code in vllm/benchmarks/serve_multi.py
@override
def format_cond(self, lhs: str) -> str:
    return f"{lhs}<{self.target:.2f}"

validate

validate(actual: float) -> bool
Source code in vllm/benchmarks/serve_multi.py
@override
def validate(self, actual: float) -> bool:
    return actual < self.target

SLALessThanOrEqual

Bases: SLACriterionBase

Source code in vllm/benchmarks/serve_multi.py
class SLALessThanOrEqual(SLACriterionBase):
    @override
    def validate(self, actual: float) -> bool:
        return actual <= self.target

    @override
    def format_cond(self, lhs: str) -> str:
        return f"{lhs}<={self.target:.2f}"

format_cond

format_cond(lhs: str) -> str
Source code in vllm/benchmarks/serve_multi.py
@override
def format_cond(self, lhs: str) -> str:
    return f"{lhs}<={self.target:.2f}"

validate

validate(actual: float) -> bool
Source code in vllm/benchmarks/serve_multi.py
@override
def validate(self, actual: float) -> bool:
    return actual <= self.target

ServerWrapper

Source code in vllm/benchmarks/serve_multi.py
class ServerWrapper:
    def __init__(
        self,
        server_cmd: list[str],
        after_bench_cmd: list[str],
        *,
        show_stdout: bool,
    ) -> None:
        super().__init__()

        self.server_cmd = server_cmd
        self.after_bench_cmd = after_bench_cmd
        self.show_stdout = show_stdout

    def run_subcommand(self, cmd: list[str]):
        return subprocess.run(
            cmd,
            stdout=None if self.show_stdout else subprocess.DEVNULL,
            check=True,
        )

    def after_bench(self) -> None:
        if not self.after_bench_cmd:
            self.reset_caches()
            return

        self.run_subcommand(self.after_bench_cmd)

    def _get_vllm_server_address(self) -> str:
        server_cmd = self.server_cmd

        for host_key in ("--host",):
            if host_key in server_cmd:
                host = server_cmd[server_cmd.index(host_key) + 1]
                break
        else:
            host = "localhost"

        for port_key in ("-p", "--port"):
            if port_key in server_cmd:
                port = int(server_cmd[server_cmd.index(port_key) + 1])
                break
        else:
            port = 8000  # The default value in vllm serve

        return f"http://{host}:{port}"

    def reset_caches(self) -> None:
        server_cmd = self.server_cmd

        # Use `.endswith()` to match `/bin/...`
        if server_cmd[0].endswith("vllm"):
            server_address = self._get_vllm_server_address()
            print(f"Resetting caches at {server_address}")

            res = requests.post(f"{server_address}/reset_prefix_cache")
            res.raise_for_status()

            res = requests.post(f"{server_address}/reset_mm_cache")
            res.raise_for_status()
        elif server_cmd[0].endswith("infinity_emb"):
            if "--vector-disk-cache" in server_cmd:
                raise NotImplementedError(
                    "Infinity server uses caching but does not expose a method "
                    "to reset the cache"
                )
        else:
            raise NotImplementedError(
                f"No implementation of `reset_caches` for `{server_cmd[0]}` server. "
                "Please specify a custom command via `--after-bench-cmd`."
            )

after_bench_cmd instance-attribute

after_bench_cmd = after_bench_cmd

server_cmd instance-attribute

server_cmd = server_cmd

show_stdout instance-attribute

show_stdout = show_stdout

__init__

__init__(
    server_cmd: list[str],
    after_bench_cmd: list[str],
    *,
    show_stdout: bool,
) -> None
Source code in vllm/benchmarks/serve_multi.py
def __init__(
    self,
    server_cmd: list[str],
    after_bench_cmd: list[str],
    *,
    show_stdout: bool,
) -> None:
    super().__init__()

    self.server_cmd = server_cmd
    self.after_bench_cmd = after_bench_cmd
    self.show_stdout = show_stdout

_get_vllm_server_address

_get_vllm_server_address() -> str
Source code in vllm/benchmarks/serve_multi.py
def _get_vllm_server_address(self) -> str:
    server_cmd = self.server_cmd

    for host_key in ("--host",):
        if host_key in server_cmd:
            host = server_cmd[server_cmd.index(host_key) + 1]
            break
    else:
        host = "localhost"

    for port_key in ("-p", "--port"):
        if port_key in server_cmd:
            port = int(server_cmd[server_cmd.index(port_key) + 1])
            break
    else:
        port = 8000  # The default value in vllm serve

    return f"http://{host}:{port}"

after_bench

after_bench() -> None
Source code in vllm/benchmarks/serve_multi.py
def after_bench(self) -> None:
    if not self.after_bench_cmd:
        self.reset_caches()
        return

    self.run_subcommand(self.after_bench_cmd)

reset_caches

reset_caches() -> None
Source code in vllm/benchmarks/serve_multi.py
def reset_caches(self) -> None:
    server_cmd = self.server_cmd

    # Use `.endswith()` to match `/bin/...`
    if server_cmd[0].endswith("vllm"):
        server_address = self._get_vllm_server_address()
        print(f"Resetting caches at {server_address}")

        res = requests.post(f"{server_address}/reset_prefix_cache")
        res.raise_for_status()

        res = requests.post(f"{server_address}/reset_mm_cache")
        res.raise_for_status()
    elif server_cmd[0].endswith("infinity_emb"):
        if "--vector-disk-cache" in server_cmd:
            raise NotImplementedError(
                "Infinity server uses caching but does not expose a method "
                "to reset the cache"
            )
    else:
        raise NotImplementedError(
            f"No implementation of `reset_caches` for `{server_cmd[0]}` server. "
            "Please specify a custom command via `--after-bench-cmd`."
        )

run_subcommand

run_subcommand(cmd: list[str])
Source code in vllm/benchmarks/serve_multi.py
def run_subcommand(self, cmd: list[str]):
    return subprocess.run(
        cmd,
        stdout=None if self.show_stdout else subprocess.DEVNULL,
        check=True,
    )

_comb_needs_server

_comb_needs_server(
    serve_comb: dict[str, object],
    bench_combs: list[dict[str, object]],
    output_dir: Path,
)
Source code in vllm/benchmarks/serve_multi.py
def _comb_needs_server(
    serve_comb: dict[str, object],
    bench_combs: list[dict[str, object]],
    output_dir: Path,
):
    for bench_comb in bench_combs:
        base_path = _get_comb_base_path(output_dir, serve_comb, bench_comb)
        if not _get_comb_run_path(base_path, run_number=None).exists():
            return True

    return False

_estimate_sla_bounds

_estimate_sla_bounds(
    server: ServerWrapper | None,
    bench_cmd: list[str],
    *,
    serve_comb: dict[str, object],
    bench_comb: dict[str, object],
    sla_comb: dict[str, SLACriterionBase],
    base_path: Path,
    num_runs: int,
    dry_run: bool,
    sla_variable: SLAVariable,
    init_value: int,
    max_value: int,
)
Source code in vllm/benchmarks/serve_multi.py
def _estimate_sla_bounds(
    server: ServerWrapper | None,
    bench_cmd: list[str],
    *,
    serve_comb: dict[str, object],
    bench_comb: dict[str, object],
    sla_comb: dict[str, SLACriterionBase],
    base_path: Path,
    num_runs: int,
    dry_run: bool,
    sla_variable: SLAVariable,
    init_value: int,
    max_value: int,
):
    sla_data = list[dict[str, object]]()

    max_passing: int = 0
    min_failing: int = 0

    val: int = init_value
    assert val > 0

    while True:
        print(f"Testing {sla_variable}: {val} req/s")

        iter_data = _run_sla(
            server,
            bench_cmd,
            serve_comb=serve_comb,
            bench_comb={**bench_comb, sla_variable: val},
            iter_path=_get_sla_iter_path(base_path, sla_comb, sla_variable, val),
            num_runs=num_runs,
            dry_run=dry_run,
        )

        assert iter_data is not None
        sla_data.extend(iter_data)

        iter_data_mean = {
            k: sum(float(run_data[k]) for run_data in iter_data) / len(iter_data)  # type: ignore
            for k in sla_comb
        }

        sla_results = [
            criterion.print_and_validate(iter_data_mean, k)
            for k, criterion in sla_comb.items()
        ]

        if all(sla_results):
            print("SLA criteria are met.")
            max_passing = val
            val *= 2
        else:
            print("SLA criteria are not met.")
            min_failing = val
            break

        if val >= max_value:
            break

    return sla_data, (max_passing, min_failing)

_estimate_sla_value

_estimate_sla_value(
    run_data: dict[str, object], sla_variable: SLAVariable
)
Source code in vllm/benchmarks/serve_multi.py
def _estimate_sla_value(run_data: dict[str, object], sla_variable: SLAVariable):
    request_throughput = float(run_data["request_throughput"])  # type: ignore
    if sla_variable == "request_rate":
        return request_throughput
    if sla_variable == "max_concurrency":
        mean_latency_ms = float(run_data["mean_e2el_ms"])  # type: ignore
        return request_throughput * mean_latency_ms / 1000

    assert_never(sla_variable)

_find_sla_value

_find_sla_value(
    server: ServerWrapper | None,
    bench_cmd: list[str],
    *,
    serve_comb: dict[str, object],
    bench_comb: dict[str, object],
    sla_comb: dict[str, SLACriterionBase],
    base_path: Path,
    num_runs: int,
    dry_run: bool,
    sla_variable: SLAVariable,
    min_value: int,
    max_value: int,
)
Source code in vllm/benchmarks/serve_multi.py
def _find_sla_value(
    server: ServerWrapper | None,
    bench_cmd: list[str],
    *,
    serve_comb: dict[str, object],
    bench_comb: dict[str, object],
    sla_comb: dict[str, SLACriterionBase],
    base_path: Path,
    num_runs: int,
    dry_run: bool,
    sla_variable: SLAVariable,
    min_value: int,
    max_value: int,
):
    sla_data = list[dict[str, object]]()

    left: int = min_value
    right: int = max_value

    while True:
        val = (left + right) // 2
        print(f"Testing {sla_variable}: {val} req/s")

        iter_data = _run_sla(
            server,
            bench_cmd,
            serve_comb=serve_comb,
            bench_comb={**bench_comb, sla_variable: val},
            iter_path=_get_sla_iter_path(base_path, sla_comb, sla_variable, val),
            num_runs=num_runs,
            dry_run=dry_run,
        )

        assert iter_data is not None
        sla_data.extend(iter_data)

        iter_data_mean = {
            k: sum(float(run_data[k]) for run_data in iter_data) / len(iter_data)  # type: ignore
            for k in sla_comb
        }

        sla_results = [
            criterion.print_and_validate(iter_data_mean, k)
            for k, criterion in sla_comb.items()
        ]

        if all(sla_results):
            print("SLA criteria are met.")
            left = val
        else:
            print("SLA criteria are not met.")
            right = val

        if right - left <= 1:
            break

    return sla_data, left

_get_comb_base_path

_get_comb_base_path(
    output_dir: Path,
    serve_comb: dict[str, object],
    bench_comb: dict[str, object],
)
Source code in vllm/benchmarks/serve_multi.py
def _get_comb_base_path(
    output_dir: Path,
    serve_comb: dict[str, object],
    bench_comb: dict[str, object],
):
    return output_dir / "-".join(
        (
            "SERVE",
            *(f"{k}={v}" for k, v in serve_comb.items()),
            "BENCH",
            *(f"{k}={v}" for k, v in bench_comb.items()),
        )
    ).replace("/", "_").replace("..", "__")  # Sanitize

_get_comb_run_path

_get_comb_run_path(base_path: Path, run_number: int | None)
Source code in vllm/benchmarks/serve_multi.py
def _get_comb_run_path(base_path: Path, run_number: int | None):
    if run_number is None:
        return base_path / "summary.json"

    return base_path / f"run={run_number}.json"

_get_sla_base_path

_get_sla_base_path(
    output_dir: Path,
    serve_comb: dict[str, object],
    bench_comb: dict[str, object],
)
Source code in vllm/benchmarks/serve_multi.py
def _get_sla_base_path(
    output_dir: Path,
    serve_comb: dict[str, object],
    bench_comb: dict[str, object],
):
    return output_dir / "-".join(
        (
            "SERVE",
            *(f"{k}={v}" for k, v in serve_comb.items()),
            "BENCH",
            *(f"{k}={v}" for k, v in bench_comb.items()),
        )
    ).replace("/", "_").replace("..", "__")  # Sanitize

_get_sla_iter_path

_get_sla_iter_path(
    base_path: Path,
    sla_comb: dict[str, SLACriterionBase],
    sla_variable: str,
    sla_value: int | None,
)
Source code in vllm/benchmarks/serve_multi.py
def _get_sla_iter_path(
    base_path: Path,
    sla_comb: dict[str, SLACriterionBase],
    sla_variable: str,
    sla_value: int | None,
):
    if sla_value is None:
        prefix = "-".join(v.format_cond(k) for k, v in sla_comb.items())
        return base_path / f"SLA-{prefix}.json"

    return base_path / f"{sla_variable}={sla_value}"

_get_sla_run_path

_get_sla_run_path(iter_path: Path, run_number: int | None)
Source code in vllm/benchmarks/serve_multi.py
def _get_sla_run_path(iter_path: Path, run_number: int | None):
    if run_number is None:
        return iter_path / "summary.json"

    return iter_path / f"run={run_number}.json"

_iter_cmd_key_candidates

_iter_cmd_key_candidates(param_key: str)
Source code in vllm/benchmarks/serve_multi.py
def _iter_cmd_key_candidates(param_key: str):
    for k in reversed(tuple(_iter_param_key_candidates(param_key))):
        yield "--" + k

_iter_param_key_candidates

_iter_param_key_candidates(param_key: str)
Source code in vllm/benchmarks/serve_multi.py
def _iter_param_key_candidates(param_key: str):
    yield param_key
    yield param_key.replace("-", "_")
    yield param_key.replace("_", "-")

_normalize_cmd_key

_normalize_cmd_key(param_key: str)
Source code in vllm/benchmarks/serve_multi.py
def _normalize_cmd_key(param_key: str):
    return next(_iter_cmd_key_candidates(param_key))

_override_args

_override_args(cmd: list[str], params: dict[str, object])
Source code in vllm/benchmarks/serve_multi.py
def _override_args(cmd: list[str], params: dict[str, object]):
    cmd = list(cmd)

    for k, v in params.items():
        for k_candidate in _iter_cmd_key_candidates(k):
            try:
                k_idx = cmd.index(k_candidate)

                if isinstance(v, bool):
                    cmd[k_idx] = _normalize_cmd_key(k if v else "no-" + k)
                else:
                    cmd[k_idx + 1] = str(v)

                break
            except ValueError:
                continue
        else:
            if isinstance(v, bool):
                cmd.append(_normalize_cmd_key(k if v else "no-" + k))
            else:
                cmd.extend([_normalize_cmd_key(k), str(v)])

    return cmd

_parse_params

_parse_params(params: list[dict[str, object]])
Source code in vllm/benchmarks/serve_multi.py
def _parse_params(params: list[dict[str, object]]):
    if not isinstance(params, list):
        raise TypeError(f"{_BAD_PARAMS_TYPE_MSG} Found JSON type {type(params)}")

    for comb in params:
        if not isinstance(comb, dict):
            raise TypeError(f"{_BAD_PARAMS_TYPE_MSG} Found item type {type(comb)}")

    return params

_parse_sla

_parse_sla(sla: list[dict[str, str]])
Source code in vllm/benchmarks/serve_multi.py
def _parse_sla(sla: list[dict[str, str]]):
    return [_parse_sla_item(item) for item in sla]

_parse_sla_item

_parse_sla_item(sla_item: dict[str, str])
Source code in vllm/benchmarks/serve_multi.py
def _parse_sla_item(sla_item: dict[str, str]):
    sla_criteria: dict[str, SLACriterionBase] = {}

    for metric_key, metric_value in sla_item.items():
        for op_key in SLA_CRITERIA:
            if metric_value.startswith(op_key):
                sla_criteria[metric_key] = SLA_CRITERIA[op_key](
                    float(metric_value.removeprefix(op_key))
                )
                break
        else:
            raise ValueError(
                f"Invalid operator for SLA constraint '{metric_key}={metric_value}'. "
                f"Valid operators are: {set(SLA_CRITERIA)}",
            )

    return sla_criteria

_plot_throughput_latency_curve

_plot_throughput_latency_curve(
    all_data: list[dict[str, object]],
    serve_combs: list[dict[str, object]],
    bench_comb: dict[str, object],
    output_dir: Path,
)
Source code in vllm/benchmarks/serve_multi.py
def _plot_throughput_latency_curve(
    all_data: list[dict[str, object]],
    serve_combs: list[dict[str, object]],
    bench_comb: dict[str, object],
    output_dir: Path,
):
    fig_path = output_dir / "-".join(
        (
            "BENCH",
            *(f"{k}={v}" for k, v in bench_comb.items()),
        )
    ).replace("/", "_").replace("..", "__")  # Sanitize

    df = pd.DataFrame.from_records(
        [item for item in all_data if all(item[k] == bench_comb[k] for k in bench_comb)]
    )

    # Group together points with similar throughput
    df["request_throughput"] = df["request_throughput"].round()

    # Preserve the key order using dictionary
    all_comb_keys = {k: None for comb in serve_combs for k in comb}
    for k in all_comb_keys:
        df[k] = df[k].astype(str)

    keys_per_comb = [comb.keys() for comb in serve_combs]
    if (
        all(ks == keys_per_comb[0] for ks in keys_per_comb)
        and len(keys_per_comb[0]) <= 3
    ):
        hue, style, size, *_ = (*keys_per_comb[0], None, None)
        ax = sns.lineplot(
            df,
            x="request_throughput",
            y="p99_e2el_ms",
            hue=hue,
            style=style,
            size=size,
            markers=True,
        )
    else:
        df["category"] = df[list(all_comb_keys)].agg("-".join, axis=1)
        ax = sns.lineplot(
            df,
            x="request_throughput",
            y="p99_e2el_ms",
            hue="category",
            markers=True,
        )

    sns.move_legend(ax, "upper left", bbox_to_anchor=(1, 1))

    fig = ax.get_figure()
    assert fig is not None

    fig.tight_layout()
    fig.savefig(fig_path)

_plot_throughput_latency_curves

_plot_throughput_latency_curves(
    all_data: list[dict[str, object]],
    serve_combs: list[dict[str, object]],
    bench_combs: list[dict[str, object]],
    output_dir: Path,
)
Source code in vllm/benchmarks/serve_multi.py
def _plot_throughput_latency_curves(
    all_data: list[dict[str, object]],
    serve_combs: list[dict[str, object]],
    bench_combs: list[dict[str, object]],
    output_dir: Path,
):
    for bench_comb in bench_combs:
        _plot_throughput_latency_curve(all_data, serve_combs, bench_comb, output_dir)

_run_benchmark

_run_benchmark(
    server: ServerWrapper | None,
    bench_cmd: list[str],
    *,
    serve_overrides: dict[str, object],
    bench_overrides: dict[str, object],
    run_number: int,
    output_path: Path,
    dry_run: bool,
)
Source code in vllm/benchmarks/serve_multi.py
def _run_benchmark(
    server: ServerWrapper | None,
    bench_cmd: list[str],
    *,
    serve_overrides: dict[str, object],
    bench_overrides: dict[str, object],
    run_number: int,
    output_path: Path,
    dry_run: bool,
):
    benchmark_cmd = [
        *_override_args(bench_cmd, bench_overrides),
        "--save-result",
        "--result-dir",
        str(output_path.parent),
        "--result-filename",
        output_path.name,
    ]

    print("[BEGIN BENCHMARK]")
    print(f"Benchmark overrides: {bench_overrides}")
    print(f"Run Number: {run_number}")
    print(f"Benchmark command: {benchmark_cmd}")
    print(f"Output file: {output_path}")

    run_data: dict[str, object]

    if output_path.exists():
        print("Found existing results. Skipping.")

        with output_path.open("rb") as f:
            run_data = json.load(f)
            return run_data

    if server is None:
        assert dry_run
        print("[END BENCHMARK]")
        return None

    output_path.parent.mkdir(parents=True, exist_ok=True)

    server.run_subcommand(benchmark_cmd)
    server.after_bench()

    with output_path.open("rb") as f:
        run_data = json.load(f)

    run_data["run_number"] = run_number
    run_data.update(serve_overrides)

    with output_path.open("w") as f:
        json.dump(run_data, f, indent=4)

    print("[END BENCHMARK]")

    return run_data

_run_comb

_run_comb(
    server: ServerWrapper | None,
    bench_cmd: list[str],
    *,
    serve_comb: dict[str, object],
    bench_comb: dict[str, object],
    base_path: Path,
    num_runs: int,
    dry_run: bool,
)
Source code in vllm/benchmarks/serve_multi.py
def _run_comb(
    server: ServerWrapper | None,
    bench_cmd: list[str],
    *,
    serve_comb: dict[str, object],
    bench_comb: dict[str, object],
    base_path: Path,
    num_runs: int,
    dry_run: bool,
):
    comb_data = list[dict[str, object]]()

    for run_number in range(num_runs):
        run_data = _run_benchmark(
            server,
            bench_cmd,
            serve_overrides=serve_comb,
            bench_overrides=bench_comb,
            run_number=run_number,
            output_path=_get_comb_run_path(base_path, run_number),
            dry_run=dry_run,
        )

        if run_data is not None:
            comb_data.append(run_data)

    if dry_run:
        return None

    with _get_comb_run_path(base_path, run_number=None).open("w") as f:
        json.dump(comb_data, f, indent=4)

    return comb_data

_run_main

_run_main(
    serve_cmd: list[str],
    bench_cmd: list[str],
    after_bench_cmd: list[str],
    *,
    show_stdout: bool,
    serve_params: list[dict[str, object]],
    bench_params: list[dict[str, object]],
    sla_params: list[dict[str, SLACriterionBase]],
    sla_variable: SLAVariable,
    output_dir: Path,
    num_runs: int,
    dry_run: bool,
)
Source code in vllm/benchmarks/serve_multi.py
def _run_main(
    serve_cmd: list[str],
    bench_cmd: list[str],
    after_bench_cmd: list[str],
    *,
    show_stdout: bool,
    serve_params: list[dict[str, object]],
    bench_params: list[dict[str, object]],
    sla_params: list[dict[str, SLACriterionBase]],
    sla_variable: SLAVariable,
    output_dir: Path,
    num_runs: int,
    dry_run: bool,
):
    if sla_params:
        return run_slas(
            serve_cmd=serve_cmd,
            bench_cmd=bench_cmd,
            after_bench_cmd=after_bench_cmd,
            show_stdout=show_stdout,
            serve_params=serve_params,
            bench_params=bench_params,
            sla_params=sla_params,
            sla_variable=sla_variable,
            output_dir=output_dir,
            num_runs=num_runs,
            dry_run=dry_run,
        )

    return run_combs(
        serve_cmd=serve_cmd,
        bench_cmd=bench_cmd,
        after_bench_cmd=after_bench_cmd,
        show_stdout=show_stdout,
        serve_params=serve_params,
        bench_params=bench_params,
        output_dir=output_dir,
        num_runs=num_runs,
        dry_run=dry_run,
    )

_run_server

_run_server(
    serve_cmd: list[str],
    after_bench_cmd: list[str],
    *,
    show_stdout: bool,
    serve_overrides: dict[str, object],
    dry_run: bool,
)
Source code in vllm/benchmarks/serve_multi.py
@contextlib.contextmanager
def _run_server(
    serve_cmd: list[str],
    after_bench_cmd: list[str],
    *,
    show_stdout: bool,
    serve_overrides: dict[str, object],
    dry_run: bool,
):
    server_cmd = _override_args(serve_cmd, serve_overrides)

    print("[BEGIN SERVER]")
    print(f"Server overrides: {serve_overrides}")
    print(f"Server command: {server_cmd}")

    if dry_run:
        yield None
        print("[END SERVER]")
        return

    # Create new process group for clean termination
    server_process = subprocess.Popen(
        server_cmd,
        start_new_session=True,
        stdout=None if show_stdout else subprocess.DEVNULL,
        # Need VLLM_SERVER_DEV_MODE=1 for `_reset_caches`
        env={**os.environ, "VLLM_SERVER_DEV_MODE": "1"},
    )

    try:
        yield ServerWrapper(
            server_cmd,
            after_bench_cmd,
            show_stdout=show_stdout,
        )
    finally:
        if server_process.poll() is None:
            # In case only some processes have been terminated
            with contextlib.suppress(ProcessLookupError):
                # We need to kill both API Server and Engine processes
                os.killpg(os.getpgid(server_process.pid), signal.SIGKILL)

        print("[END SERVER]")

_run_sla

_run_sla(
    server: ServerWrapper | None,
    bench_cmd: list[str],
    *,
    serve_comb: dict[str, object],
    bench_comb: dict[str, object],
    iter_path: Path,
    num_runs: int,
    dry_run: bool,
)
Source code in vllm/benchmarks/serve_multi.py
def _run_sla(
    server: ServerWrapper | None,
    bench_cmd: list[str],
    *,
    serve_comb: dict[str, object],
    bench_comb: dict[str, object],
    iter_path: Path,
    num_runs: int,
    dry_run: bool,
):
    iter_data = list[dict[str, object]]()

    for run_number in range(num_runs):
        run_data = _run_benchmark(
            server,
            bench_cmd,
            serve_overrides=serve_comb,
            bench_overrides=bench_comb,
            run_number=run_number,
            output_path=_get_sla_run_path(iter_path, run_number),
            dry_run=dry_run,
        )

        if run_data is not None:
            iter_data.append(run_data)

    if dry_run:
        return None

    with _get_sla_run_path(iter_path, run_number=None).open("w") as f:
        json.dump(iter_data, f, indent=4)

    return iter_data

_search_sla

_search_sla(
    server: ServerWrapper | None,
    bench_cmd: list[str],
    *,
    serve_comb: dict[str, object],
    bench_comb: dict[str, object],
    sla_comb: dict[str, SLACriterionBase],
    sla_variable: SLAVariable,
    sla_inf_value: int = 65536,
    base_path: Path,
    num_runs: int,
    dry_run: bool,
)
Source code in vllm/benchmarks/serve_multi.py
def _search_sla(
    server: ServerWrapper | None,
    bench_cmd: list[str],
    *,
    serve_comb: dict[str, object],
    bench_comb: dict[str, object],
    sla_comb: dict[str, SLACriterionBase],
    sla_variable: SLAVariable,
    sla_inf_value: int = 65536,  # The value that represents infinite QPS
    base_path: Path,
    num_runs: int,
    dry_run: bool,
):
    print("[SLA START]")
    print(f"SLA criteria: {', '.join(v.format_cond(k) for k, v in sla_comb.items())}")

    sla_data_0 = _run_sla(
        server,
        bench_cmd,
        serve_comb=serve_comb,
        bench_comb={**bench_comb, sla_variable: sla_inf_value},
        iter_path=_get_sla_iter_path(base_path, sla_comb, sla_variable, sla_inf_value),
        num_runs=num_runs,
        dry_run=dry_run,
    )
    if sla_data_0 is None:
        assert dry_run
        print("Omitting SLA search.")
        print("[SLA END]")
        return None

    sla_init_value = math.ceil(
        sum(_estimate_sla_value(item, sla_variable) for item in sla_data_0)
        / len(sla_data_0)
    )
    print(f"Initial {sla_variable} to search: {sla_init_value} req/s.")

    sla_data_1, (sla_min, sla_max) = _estimate_sla_bounds(
        server,
        bench_cmd,
        serve_comb=serve_comb,
        bench_comb=bench_comb,
        sla_comb=sla_comb,
        base_path=base_path,
        num_runs=num_runs,
        dry_run=dry_run,
        sla_variable=sla_variable,
        init_value=sla_init_value,
        max_value=sla_inf_value,
    )
    print(f"Range of {sla_variable} to search: [{sla_min}, {sla_max}] req/s.")

    sla_data_2, sla_value = _find_sla_value(
        server,
        bench_cmd,
        serve_comb=serve_comb,
        bench_comb=bench_comb,
        sla_comb=sla_comb,
        base_path=base_path,
        num_runs=num_runs,
        dry_run=dry_run,
        sla_variable=sla_variable,
        min_value=sla_min,
        max_value=sla_max,
    )

    sla_data = sla_data_0 + sla_data_1 + sla_data_2
    print(f"Maximum {sla_variable} for SLA: {sla_value} req/s.")

    with _get_sla_iter_path(
        base_path,
        sla_comb,
        sla_variable,
        sla_value=None,
    ).open("w") as f:
        json.dump(sla_data, f, indent=4)

    print("[SLA END]")

    return sla_data

_sla_needs_server

_sla_needs_server(
    serve_comb: dict[str, object],
    bench_combs: list[dict[str, object]],
    sla_combs: list[dict[str, SLACriterionBase]],
    sla_variable: str,
    output_dir: Path,
)
Source code in vllm/benchmarks/serve_multi.py
def _sla_needs_server(
    serve_comb: dict[str, object],
    bench_combs: list[dict[str, object]],
    sla_combs: list[dict[str, SLACriterionBase]],
    sla_variable: str,
    output_dir: Path,
):
    for bench_comb in bench_combs:
        base_path = _get_sla_base_path(output_dir, serve_comb, bench_comb)
        for sla_comb in sla_combs:
            if not _get_sla_iter_path(
                base_path,
                sla_comb,
                sla_variable,
                sla_value=None,
            ).exists():
                return True

    return False

main

main()
Source code in vllm/benchmarks/serve_multi.py
def main():
    parser = argparse.ArgumentParser(
        description="Run vLLM server benchmark on a parameter grid of settings."
    )
    parser.add_argument(
        "--serve-cmd",
        type=str,
        required=True,
        help="The command used to run the server: `vllm serve ...`",
    )
    parser.add_argument(
        "--bench-cmd",
        type=str,
        required=True,
        help="The command used to run the benchmark: `vllm bench serve ...`",
    )
    parser.add_argument(
        "--after-bench-cmd",
        type=str,
        default=None,
        help="After a benchmark run is complete, invoke this command instead of the "
        "default `ServerWrapper.clear_cache()`.",
    )
    parser.add_argument(
        "--show-stdout",
        action="store_true",
        help="If set, logs the standard output of subcommands. "
        "Useful for debugging but can be quite spammy.",
    )
    parser.add_argument(
        "--serve-params",
        type=str,
        default=None,
        help="Path to JSON file containing a list of parameter combinations "
        "for the `vllm serve` command. "
        "If both `serve_params` and `bench_params` are given, "
        "this script will iterate over their Cartesian product.",
    )
    parser.add_argument(
        "--bench-params",
        type=str,
        default=None,
        help="Path to JSON file containing a list of parameter combinations "
        "for the `vllm bench serve` command. "
        "If both `serve_params` and `bench_params` are given, "
        "this script will iterate over their Cartesian product.",
    )
    parser.add_argument(
        "--sla-params",
        type=str,
        default=None,
        help="Path to JSON file containing a list of SLA constraints to satisfy. "
        'Each constraint is expressed in `{"<KEY>": "<OP><VALUE>"}` format, '
        'e.g.: `{"p99_e2el_ms": "<=500"}` means that '
        "the E2E latency should be less than 500ms 99% of the time. "
        "Setting this option runs this script in SLA mode, which searches for the "
        "maximum `sla_variable` that satisfies the constraints for each combination "
        "of `serve_params`, `bench_params`, and `sla_params`.",
    )
    parser.add_argument(
        "--sla-variable",
        type=str,
        choices=get_args(SLAVariable),
        default="request_rate",
        help="Whether to tune request rate or maximum concurrency to satisfy "
        "the SLA constraints.",
    )
    parser.add_argument(
        "-o",
        "--output-dir",
        type=str,
        default="results",
        help="The directory to which results are written.",
    )
    parser.add_argument(
        "--num-runs",
        type=int,
        default=3,
        help="Number of runs per parameter combination.",
    )
    parser.add_argument(
        "--dry-run",
        action="store_true",
        help="If set, prints the commands to run then exits without running them.",
    )
    parser.add_argument(
        "--resume",
        type=str,
        default=None,
        help="Set this to the name of a directory under `output_dir` (which is a "
        "timestamp) to resume a previous execution of this script, i.e., only run "
        "parameter combinations for which there are still no output files.",
    )

    args = parser.parse_args()

    serve_cmd = shlex.split(args.serve_cmd)
    bench_cmd = shlex.split(args.bench_cmd)
    after_bench_cmd = (
        [] if args.after_bench_cmd is None else shlex.split(args.after_bench_cmd)
    )

    serve_params: list[dict[str, object]]
    if args.serve_params:
        with open(args.serve_params, "rb") as f:
            serve_params = _parse_params(json.load(f))
    else:
        # i.e.: run serve_cmd without any modification
        serve_params = [{}]

    bench_params: list[dict[str, object]]
    if args.bench_params:
        with open(args.bench_params, "rb") as f:
            bench_params = _parse_params(json.load(f))
    else:
        # i.e.: run bench_cmd without any modification
        bench_params = [{}]

    sla_params: list[dict[str, SLACriterionBase]]
    if args.sla_params:
        with open(args.sla_params, "rb") as f:
            sla_params = _parse_sla(json.load(f))
    else:
        sla_params = []

    num_runs = args.num_runs
    if num_runs < 1:
        raise ValueError("`num_runs` should be at least 1.")

    run_main(
        serve_cmd=serve_cmd,
        bench_cmd=bench_cmd,
        after_bench_cmd=after_bench_cmd,
        show_stdout=args.show_stdout,
        serve_params=serve_params,
        bench_params=bench_params,
        sla_params=sla_params,
        sla_variable=args.sla_variable,
        output_dir=Path(args.output_dir),
        num_runs=num_runs,
        dry_run=args.dry_run,
        resume=args.resume,
    )

run_combs

run_combs(
    serve_cmd: list[str],
    bench_cmd: list[str],
    after_bench_cmd: list[str],
    *,
    show_stdout: bool,
    serve_params: list[dict[str, object]],
    bench_params: list[dict[str, object]],
    output_dir: Path,
    num_runs: int,
    dry_run: bool,
)
Source code in vllm/benchmarks/serve_multi.py
def run_combs(
    serve_cmd: list[str],
    bench_cmd: list[str],
    after_bench_cmd: list[str],
    *,
    show_stdout: bool,
    serve_params: list[dict[str, object]],
    bench_params: list[dict[str, object]],
    output_dir: Path,
    num_runs: int,
    dry_run: bool,
):
    all_data = list[dict[str, object]]()
    for serve_comb in serve_params:
        with (
            _run_server(
                serve_cmd,
                after_bench_cmd,
                show_stdout=show_stdout,
                serve_overrides=serve_comb,
                dry_run=dry_run,
            )
            if _comb_needs_server(serve_comb, bench_params, output_dir)
            else contextlib.nullcontext()
        ) as server:
            for bench_comb in bench_params:
                base_path = _get_comb_base_path(output_dir, serve_comb, bench_comb)

                comb_data = _run_comb(
                    server,
                    bench_cmd,
                    serve_comb=serve_comb,
                    bench_comb=bench_comb,
                    base_path=base_path,
                    num_runs=num_runs,
                    dry_run=dry_run,
                )

                if comb_data is not None:
                    all_data.extend(comb_data)

    if dry_run:
        return None

    combined_df = pd.DataFrame.from_records(all_data)
    combined_df.to_csv(output_dir / "summary.csv")

    return combined_df

run_main

run_main(
    serve_cmd: list[str],
    bench_cmd: list[str],
    after_bench_cmd: list[str],
    *,
    show_stdout: bool,
    serve_params: list[dict[str, object]],
    bench_params: list[dict[str, object]],
    sla_params: list[dict[str, SLACriterionBase]],
    sla_variable: SLAVariable,
    output_dir: Path,
    num_runs: int,
    dry_run: bool,
    resume: str | None,
)
Source code in vllm/benchmarks/serve_multi.py
def run_main(
    serve_cmd: list[str],
    bench_cmd: list[str],
    after_bench_cmd: list[str],
    *,
    show_stdout: bool,
    serve_params: list[dict[str, object]],
    bench_params: list[dict[str, object]],
    sla_params: list[dict[str, SLACriterionBase]],
    sla_variable: SLAVariable,
    output_dir: Path,
    num_runs: int,
    dry_run: bool,
    resume: str | None,
):
    timestamp = resume or datetime.now().strftime("%Y%m%d_%H%M%S")
    output_dir = output_dir / timestamp

    if resume and not output_dir.exists():
        raise ValueError(f"Cannot resume from non-existent directory ({output_dir})")

    try:
        return _run_main(
            serve_cmd=serve_cmd,
            bench_cmd=bench_cmd,
            after_bench_cmd=after_bench_cmd,
            show_stdout=show_stdout,
            serve_params=serve_params,
            bench_params=bench_params,
            sla_params=sla_params,
            sla_variable=sla_variable,
            output_dir=output_dir,
            num_runs=num_runs,
            dry_run=dry_run,
        )
    except BaseException as exc:
        raise RuntimeError(
            f"The script was terminated early. Use `--resume {timestamp}` "
            f"to continue the script from its last checkpoint."
        ) from exc

run_slas

run_slas(
    serve_cmd: list[str],
    bench_cmd: list[str],
    after_bench_cmd: list[str],
    *,
    show_stdout: bool,
    serve_params: list[dict[str, object]],
    bench_params: list[dict[str, object]],
    sla_params: list[dict[str, SLACriterionBase]],
    sla_variable: SLAVariable,
    output_dir: Path,
    num_runs: int,
    dry_run: bool,
)
Source code in vllm/benchmarks/serve_multi.py
def run_slas(
    serve_cmd: list[str],
    bench_cmd: list[str],
    after_bench_cmd: list[str],
    *,
    show_stdout: bool,
    serve_params: list[dict[str, object]],
    bench_params: list[dict[str, object]],
    sla_params: list[dict[str, SLACriterionBase]],
    sla_variable: SLAVariable,
    output_dir: Path,
    num_runs: int,
    dry_run: bool,
):
    if any(
        k in bench_comb
        for bench_comb in bench_params
        for k in _iter_param_key_candidates(sla_variable)
    ):
        raise ValueError(
            f"You should not override `{sla_variable}` in `bench_params` in SLA mode, "
            "since it is supposed to be determined automatically."
        )

    all_data = list[dict[str, object]]()
    for serve_comb in serve_params:
        with (
            _run_server(
                serve_cmd,
                after_bench_cmd,
                show_stdout=show_stdout,
                serve_overrides=serve_comb,
                dry_run=dry_run,
            )
            if _sla_needs_server(
                serve_comb,
                bench_params,
                sla_params,
                sla_variable,
                output_dir,
            )
            else contextlib.nullcontext()
        ) as server:
            for bench_comb in bench_params:
                for sla_comb in sla_params:
                    base_path = _get_sla_base_path(output_dir, serve_comb, bench_comb)

                    comb_data = _search_sla(
                        server,
                        bench_cmd,
                        serve_comb=serve_comb,
                        bench_comb=bench_comb,
                        sla_comb=sla_comb,
                        sla_variable=sla_variable,
                        base_path=base_path,
                        num_runs=num_runs,
                        dry_run=dry_run,
                    )

                    if comb_data is not None:
                        all_data.extend(comb_data)

    if dry_run:
        return None

    combined_df = pd.DataFrame.from_records(all_data)
    combined_df.to_csv(output_dir / "summary.csv")

    _plot_throughput_latency_curves(all_data, serve_params, bench_params, output_dir)

    return combined_df