Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
)
from google.cloud.storage import __version__

_DEFAULT_HOST = "storage.googleapis.com"


class AsyncGrpcClient:
"""An asynchronous client for interacting with Google Cloud Storage using the gRPC API.
Expand Down Expand Up @@ -109,7 +111,15 @@ def _create_async_grpc_client(

primary_user_agent = client_info.to_user_agent()

host = _DEFAULT_HOST
quota_project_id = None
if client_options:
host = getattr(client_options, "api_endpoint", None) or _DEFAULT_HOST
quota_project_id = getattr(client_options, "quota_project_id", None)

channel = transport_cls.create_channel(
host=host,
quota_project_id=quota_project_id,
attempt_direct_path=attempt_direct_path,
credentials=credentials,
options=(("grpc.primary_user_agent", primary_user_agent),),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,29 @@ def publish_benchmark_extra_info(
benchmark.extra_info["bucket_name"] = params.bucket_name
benchmark.extra_info["bucket_type"] = params.bucket_type
benchmark.extra_info["processes"] = params.num_processes
benchmark.extra_info["num_downloads_after_open"] = params.num_downloads_after_open
benchmark.extra_info["ignore_first_download"] = params.ignore_first_download
benchmark.group = benchmark_group

if download_bytes_list is not None:
assert duration is not None, (
"Duration must be provided if total_bytes_transferred is provided."
)
throughputs_list = [x / duration / (1024 * 1024) for x in download_bytes_list]
effective_downloads = params.num_downloads_after_open
if params.ignore_first_download:
effective_downloads -= 1
effective_downloads = max(1, effective_downloads)

if params.pattern == "whole":
# duration is total time for all rounds
duration_per_round = duration / len(download_bytes_list)
throughputs_list = [
(x / duration_per_round / effective_downloads) / (1024 * 1024)
for x in download_bytes_list
]
else:
# duration is time per round
throughputs_list = [x / duration / (1024 * 1024) for x in download_bytes_list]
min_throughput = min(throughputs_list)
max_throughput = max(throughputs_list)
mean_throughput = statistics.mean(throughputs_list)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@
from io import BytesIO

import pytest
import aiohttp
import subprocess
import math

token = subprocess.run(
["gcloud", "auth", "print-access-token"],
capture_output=True,
text=True,
check=True,
).stdout.strip()


import tests.perf.microbenchmarks.time_based.reads.config as config
from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient
Expand All @@ -48,6 +59,7 @@ async def create_client():
worker_loop = None
worker_client = None
worker_json_client = None
worker_session = None


# TODO: b/479135274 close clients properly.
Expand All @@ -59,23 +71,28 @@ def _worker_init(bucket_type):
0, {i for i in range(0, os.cpu_count()) if i not in cpu_affinity}
)

global worker_loop, worker_client, worker_json_client
global worker_loop, worker_client, worker_json_client, worker_session
if bucket_type == "zonal":
worker_loop = asyncio.new_event_loop()
asyncio.set_event_loop(worker_loop)
worker_client = worker_loop.run_until_complete(create_client())
else: # regional
from google.cloud import storage
worker_loop = asyncio.new_event_loop()
asyncio.set_event_loop(worker_loop)
async def _init_session():
return aiohttp.ClientSession()
worker_session = worker_loop.run_until_complete(_init_session())

worker_json_client = storage.Client()
import atexit
def _cleanup_session():
if not worker_session.closed:
worker_loop.run_until_complete(worker_session.close())
atexit.register(_cleanup_session)


def _download_time_based_json(client, filename, params):
"""Performs time-based downloads using the JSON API."""
async def _download_time_based_json_async(session, filename, params):
"""Performs time-based downloads using the JSON API via aiohttp."""
total_bytes_downloaded = 0
bucket = client.bucket(params.bucket_name)
blob = bucket.blob(filename)

offset = 0
is_warming_up = True
start_time = time.monotonic()
Expand All @@ -89,17 +106,20 @@ def _download_time_based_json(client, filename, params):
total_bytes_downloaded = 0 # Reset counter after warmup

bytes_in_iteration = 0
# For JSON, we can't batch ranges like gRPC, so we download one by one
for _ in range(params.num_ranges):
if params.pattern == "rand":
offset = random.randint(
0, params.file_size_bytes - params.chunk_size_bytes
)

data = blob.download_as_bytes(
start=offset, end=offset + params.chunk_size_bytes - 1
)
bytes_in_iteration += len(data)
url = f"https://storage.googleapis.com/storage/v1/b/{params.bucket_name}/o/{filename}?alt=media"
headers = {
"Authorization": f"Bearer {token}",
"Range": f"bytes={offset}-{offset + params.chunk_size_bytes - 1}",
}
async with session.get(url, headers=headers) as response:
data = await response.read()
bytes_in_iteration += len(data)

if params.pattern == "seq":
offset += params.chunk_size_bytes
Expand Down Expand Up @@ -168,7 +188,9 @@ def _download_files_worker(process_idx, filename, params, bucket_type):
_download_time_based_async(worker_client, filename, params)
)
else: # regional
return _download_time_based_json(worker_json_client, filename, params)
return worker_loop.run_until_complete(
_download_time_based_json_async(worker_session, filename, params)
)


def download_files_mp_mc_wrapper(pool, files_names, params, bucket_type):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import os
from typing import Dict, List

import yaml

try:
from tests.perf.microbenchmarks.time_based.reads_regional.parameters import (
TimeBasedReadParameters,
)
except ModuleNotFoundError:
from reads_regional.parameters import TimeBasedReadParameters


def _get_params() -> Dict[str, List[TimeBasedReadParameters]]:
"""Generates a dictionary of benchmark parameters for time based read operations."""
params: Dict[str, List[TimeBasedReadParameters]] = {}
config_path = os.path.join(os.path.dirname(__file__), "config.yaml")
with open(config_path, "r") as f:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It is recommended to specify an explicit encoding (e.g., encoding="utf-8") when opening files to ensure consistent behavior across different platforms and locales.

Suggested change
with open(config_path, "r") as f:
with open(config_path, "r", encoding="utf-8") as f:

config = yaml.safe_load(f)

common_params = config["common"]
read_types = common_params["read_types"]
file_sizes = common_params["file_sizes"]
chunk_sizes_kib = common_params["chunk_sizes_kib"]
num_ranges = common_params["num_ranges"]
rounds = common_params["rounds"]
duration = common_params["duration"]
warmup_duration = common_params["warmup_duration"]
num_downloads_after_open = common_params["num_downloads_after_open"]
ignore_first_download = common_params["ignore_first_download"]

# All read types use the same regional bucket
bucket_name = os.environ.get(
"DEFAULT_STANDARD_BUCKET", config["defaults"]["DEFAULT_STANDARD_BUCKET"]
)

for workload in config["workload"]:
workload_name = workload["name"]
params[workload_name] = []
pattern = workload["pattern"]
processes = workload["processes"]
coros = workload["coros"]

# Create a product of all parameter combinations
product = itertools.product(
read_types,
file_sizes,
chunk_sizes_kib,
num_ranges,
processes,
coros,
)

for (
read_type,
file_size,
chunk_size_kib,
num_ranges_val,
num_processes,
num_coros,
) in product:
file_size_bytes = file_size
chunk_size_bytes = chunk_size_kib * 1024

num_files = num_processes

# Create a descriptive name for the parameter set
name = f"{pattern}_{read_type}_{num_processes}p_{num_coros}c_{file_size / (1024 * 1024)}MiB_{chunk_size_kib}KiB_{num_ranges_val}ranges"

params[workload_name].append(
TimeBasedReadParameters(
name=name,
workload_name=workload_name,
pattern=pattern,
bucket_name=bucket_name,
bucket_type="regional",
read_type=read_type,
num_coros=num_coros,
num_processes=num_processes,
num_files=num_files,
rounds=rounds,
chunk_size_bytes=chunk_size_bytes,
file_size_bytes=file_size_bytes,
duration=duration,
warmup_duration=warmup_duration,
num_ranges=num_ranges_val,
num_downloads_after_open=num_downloads_after_open,
ignore_first_download=ignore_first_download,
)
)
return params
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
common:
read_types:
- "async_json"
- "async_grpc_dp"
- "async_grpc_cp"
file_sizes:
- 10737418240 # 10GiB in bytes
chunk_sizes_kib: [64]
num_ranges: [1]
rounds: 1
num_downloads_after_open: 3
ignore_first_download: true
duration: 30 # seconds
warmup_duration: 5 # seconds

workload:
############# multi process multi coroutine #########
- name: "read_seq_multi_process"
pattern: "seq"
coros: [1]
processes: [96]

- name: "read_rand_multi_process"
pattern: "rand"
coros: [1, 16]
processes: [1]

- name: "read_whole_multi_process"
pattern: "whole"
coros: [1]
processes: [1]

defaults:
DEFAULT_STANDARD_BUCKET: "chandrasiri-benchmarks-rb"
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass

from tests.perf.microbenchmarks.parameters import IOBenchmarkParameters


@dataclass
class TimeBasedReadParameters(IOBenchmarkParameters):
pattern: str
duration: int
warmup_duration: int
num_ranges: int
read_type: str
num_downloads_after_open: int
ignore_first_download: bool
Loading
Loading