Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/en/sql-reference/statements/system.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ Normally shuts down ClickHouse (like `service clickhouse-server stop` / `kill {$

Aborts ClickHouse process (like `kill -9 {$ pid_clickhouse-server}`)

## SYSTEM PRESHUTDOWN {#preshutdown}

<CloudNotSupportedBadge/>

Prepare node for graceful shutdown. Unregister in autodiscovered clusters, stop accepting distributed requests to object storages (s3Cluster, icebergCluster, etc.).

## SYSTEM INSTRUMENT {#instrument}

Manages instrumentation points using LLVM's XRay feature which is available when ClickHouse is built using `ENABLE_XRAY=1`.
Expand Down
4 changes: 4 additions & 0 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2607,6 +2607,8 @@ try

}

global_context->startSwarmMode();

{
std::lock_guard lock(servers_lock);
/// We should start interserver communications before (and more important shutdown after) tables.
Expand Down Expand Up @@ -3094,6 +3096,8 @@ try

is_cancelled = true;

global_context->stopSwarmMode();

LOG_DEBUG(log, "Waiting for current connections to close.");

size_t current_connections = 0;
Expand Down
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ enum class AccessType : uint8_t
M(SYSTEM_TTL_MERGES, "SYSTEM STOP TTL MERGES, SYSTEM START TTL MERGES, STOP TTL MERGES, START TTL MERGES", TABLE, SYSTEM) \
M(SYSTEM_FETCHES, "SYSTEM STOP FETCHES, SYSTEM START FETCHES, STOP FETCHES, START FETCHES", TABLE, SYSTEM) \
M(SYSTEM_MOVES, "SYSTEM STOP MOVES, SYSTEM START MOVES, STOP MOVES, START MOVES", TABLE, SYSTEM) \
M(SYSTEM_SWARM, "SYSTEM STOP SWARM MODE, SYSTEM START SWARM MODE, STOP SWARM MODE, START SWARM MODE", GLOBAL, SYSTEM) \
M(SYSTEM_PULLING_REPLICATION_LOG, "SYSTEM STOP PULLING REPLICATION LOG, SYSTEM START PULLING REPLICATION LOG", TABLE, SYSTEM) \
M(SYSTEM_CLEANUP, "SYSTEM STOP CLEANUP, SYSTEM START CLEANUP", TABLE, SYSTEM) \
M(SYSTEM_VIEWS, "SYSTEM REFRESH VIEW, SYSTEM START VIEWS, SYSTEM STOP VIEWS, SYSTEM START VIEW, SYSTEM STOP VIEW, SYSTEM CANCEL VIEW, REFRESH VIEW, START VIEWS, STOP VIEWS, START VIEW, STOP VIEW, CANCEL VIEW", VIEW, SYSTEM) \
Expand Down
7 changes: 5 additions & 2 deletions src/Client/MultiplexedConnections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ void MultiplexedConnections::sendIgnoredPartUUIDs(const std::vector<UUID> & uuid
void MultiplexedConnections::sendClusterFunctionReadTaskResponse(const ClusterFunctionReadTaskResponse & response)
{
std::lock_guard lock(cancel_mutex);
if (cancelled)
if (cancelled || !current_connection || !current_connection->isConnected())
return;
current_connection->sendClusterFunctionReadTaskResponse(response);
}
Expand All @@ -241,7 +241,7 @@ void MultiplexedConnections::sendClusterFunctionReadTaskResponse(const ClusterFu
void MultiplexedConnections::sendMergeTreeReadTaskResponse(const ParallelReadResponse & response)
{
std::lock_guard lock(cancel_mutex);
if (cancelled)
if (cancelled || !current_connection || !current_connection->isConnected())
return;
current_connection->sendMergeTreeReadTaskResponse(response);
}
Expand Down Expand Up @@ -527,9 +527,12 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead

void MultiplexedConnections::invalidateReplica(ReplicaState & state)
{
Connection * old_connection = state.connection;
state.connection = nullptr;
state.pool_entry = IConnectionPool::Entry();
--active_connection_count;
if (current_connection == old_connection)
current_connection = nullptr;
}

void MultiplexedConnections::setAsyncCallback(AsyncCallback async_callback)
Expand Down
1 change: 1 addition & 0 deletions src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@
M(StartupScriptsExecutionState, "State of startup scripts execution: 0 = not finished, 1 = success, 2 = failure.") \
\
M(IsServerShuttingDown, "Indicates if the server is shutting down: 0 = no, 1 = yes") \
M(IsSwarmModeEnabled, "Indicates if the swarm mode enabled or not: 0 = disabled, 1 = enabled") \
\
M(StatelessWorkerThreads, "Number of threads in the stateless worker thread pool.") \
M(StatelessWorkerThreadsActive, "Number of threads in the stateless worker thread pool running a task.") \
Expand Down
11 changes: 11 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1377,6 +1377,17 @@ The server successfully detected this situation and will download merged part fr
M(RuntimeFilterRowsChecked, "Number of rows checked by JOIN Runtime Filters", ValueType::Number) \
M(RuntimeFilterRowsPassed, "Number of rows that passed (not filtered out by) JOIN Runtime Filters", ValueType::Number) \
M(RuntimeFilterRowsSkipped, "Number of rows in blocks that were skipped by JOIN Runtime Filters", ValueType::Number) \
\
M(ObjectStorageClusterSentToMatchedReplica, "Number of tasks in ObjectStorageCluster request sent to matched replica.", ValueType::Number) \
M(ObjectStorageClusterSentToNonMatchedReplica, "Number of tasks in ObjectStorageCluster request sent to non-matched replica.", ValueType::Number) \
M(ObjectStorageClusterProcessedTasks, "Number of processed tasks in ObjectStorageCluster request.", ValueType::Number) \
M(ObjectStorageClusterWaitingMicroseconds, "Time of waiting for tasks in ObjectStorageCluster request.", ValueType::Microseconds) \
M(ObjectStorageListObjectsCacheHits, "Number of times object storage list objects operation hit the cache.", ValueType::Number) \
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cache-related events are from different PR.

M(ObjectStorageListObjectsCacheMisses, "Number of times object storage list objects operation miss the cache.", ValueType::Number) \
M(ObjectStorageListObjectsCacheExactMatchHits, "Number of times object storage list objects operation hit the cache with an exact match.", ValueType::Number) \
M(ObjectStorageListObjectsCachePrefixMatchHits, "Number of times object storage list objects operation miss the cache using prefix matching.", ValueType::Number) \
M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \
M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number)


#ifdef APPLY_FOR_EXTERNAL_EVENTS
Expand Down
2 changes: 2 additions & 0 deletions src/Core/Protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ namespace Protocol
MergeTreeReadTaskRequest = 16, /// Request from a MergeTree replica to a coordinator
TimezoneUpdate = 17, /// Receive server's (session-wide) default timezone
SSHChallenge = 18, /// Return challenge for SSH signature signing

MAX = SSHChallenge,

ConnectionLost = 255, /// Exception that occurred on the client side.
};

/// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10
Expand Down
41 changes: 41 additions & 0 deletions src/Core/Range.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@
#include <Core/Range.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadBufferFromString.h>
#include <Common/FieldVisitorToString.h>
#include <Common/FieldAccurateComparison.h>
#include <Common/Base64.h>


namespace DB
{

namespace ErrorCodes
{
extern const int INCORRECT_DATA;
};


FieldRef::FieldRef(ColumnsWithTypeAndName * columns_, size_t row_idx_, size_t column_idx_)
: Field((*(*columns_)[column_idx_].column)[row_idx_]), columns(columns_), row_idx(row_idx_), column_idx(column_idx_)
{
Expand Down Expand Up @@ -151,6 +159,13 @@ bool Range::isInfinite() const
return left.isNegativeInfinity() && right.isPositiveInfinity();
}

/// [x, x]
bool Range::isPoint() const
{
return fullBounded() && left_included && right_included && equals(left, right)
&& !left.isNegativeInfinity() && !left.isPositiveInfinity();
}

bool Range::intersectsRange(const Range & r) const
{
/// r to the left of me.
Expand Down Expand Up @@ -276,6 +291,32 @@ bool Range::nearByWith(const Range & r) const
return false;
}

String Range::serialize(bool base64) const
{
WriteBufferFromOwnString str;

str << left_included << right_included;
writeFieldBinary(left, str);
writeFieldBinary(right, str);

if (base64)
return base64Encode(str.str());
else
return str.str();
}

void Range::deserialize(const String & range, bool base64)
{
if (range.empty())
throw Exception(ErrorCodes::INCORRECT_DATA, "Empty range dump");

ReadBufferFromOwnString str(base64 ? base64Decode(range) : range);

str >> left_included >> right_included;
left = readFieldBinary(str);
right = readFieldBinary(str);
}

Range intersect(const Range & a, const Range & b)
{
Range res = Range::createWholeUniverse();
Expand Down
5 changes: 5 additions & 0 deletions src/Core/Range.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ struct Range

bool isBlank() const;

bool isPoint() const;

bool intersectsRange(const Range & r) const;

bool containsRange(const Range & r) const;
Expand All @@ -114,6 +116,9 @@ struct Range
bool nearByWith(const Range & r) const;

String toString() const;

String serialize(bool base64 = false) const;
void deserialize(const String & range, bool base64 = false);
};

Range intersect(const Range & a, const Range & b);
Expand Down
19 changes: 18 additions & 1 deletion src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7665,6 +7665,19 @@ Default number of tasks for parallel reading in distributed query. Tasks are spr
DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"(
Removes unnecessary exchanges in distributed query plan. Disable it for debugging.
)", 0) \
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 500, R"(
In object storage distribution queries do not distribute tasks on non-prefetched nodes until prefetched node is active.
Determines how long the free executor node (one that finished processing all of it assigned tasks) should wait before "stealing" tasks from queue of currently busy executor nodes.

Possible values:

- 0 - steal tasks immediately after freeing up.
- >0 - wait for specified period of time before stealing tasks.

Having this `>0` helps with cache reuse and might improve overall query time.
Because busy node might have warmed-up caches for this specific task, while free node needs to fetch lots of data from S3.
Which might take longer than just waiting for the busy node and generate extra traffic.
)", EXPERIMENTAL) \
DECLARE(String, distributed_plan_force_exchange_kind, "", R"(
Force specified kind of Exchange operators between distributed query stages.

Expand Down Expand Up @@ -7712,6 +7725,9 @@ If the number of set bits in a runtime bloom filter exceeds this ratio the filte
)", EXPERIMENTAL) \
DECLARE(Bool, rewrite_in_to_join, false, R"(
Rewrite expressions like 'x IN subquery' to JOIN. This might be useful for optimizing the whole query with join reordering.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"(
Allow Iceberg read optimization based on Iceberg metadata.
)", EXPERIMENTAL) \
\
/** Experimental timeSeries* aggregate functions. */ \
Expand Down Expand Up @@ -7880,7 +7896,8 @@ Maximum number of WebAssembly UDF instances that can run in parallel per functio
MAKE_OBSOLETE(M, Bool, describe_extend_object_types, false) \
MAKE_OBSOLETE(M, Bool, allow_experimental_object_type, false) \
MAKE_OBSOLETE(M, BoolAuto, insert_select_deduplicate, Field{"auto"}) \
MAKE_OBSOLETE(M, Bool, use_text_index_dictionary_cache, false)
MAKE_OBSOLETE(M, Bool, use_text_index_dictionary_cache, false) \
MAKE_OBSOLETE(M, Bool, allow_retries_in_cluster_requests, false)
/** The section above is for obsolete settings. Do not add anything there. */
#endif /// __CLION_IDE__

Expand Down
24 changes: 24 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,30 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
// {"export_merge_tree_part_throw_on_pending_patch_parts", true, true, "New setting."},
// {"object_storage_cluster", "", "", "Antalya: New setting"},
// {"object_storage_max_nodes", 0, 0, "Antalya: New setting"},
{"allow_experimental_iceberg_read_optimization", true, true, "New setting."},
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of uncommented settings are not from ported PR.

{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
{"lock_object_storage_task_distribution_ms", 500, 500, "New setting."},
{"allow_retries_in_cluster_requests", false, false, "New setting"},
// {"object_storage_remote_initiator", false, false, "New setting."},
{"allow_experimental_export_merge_tree_part", false, true, "Turned ON by default for Antalya."},
{"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."},
{"export_merge_tree_partition_force_export", false, false, "New setting."},
{"export_merge_tree_partition_max_retries", 3, 3, "New setting."},
{"export_merge_tree_partition_manifest_ttl", 180, 180, "New setting."},
{"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."},
// {"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."},
{"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."},
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
{"enable_alias_marker", true, true, "New setting."},
// {"input_format_parquet_use_native_reader_v3", false, true, "Seems stable"},
// {"input_format_parquet_verify_checksums", true, true, "New setting."},
// {"output_format_parquet_write_checksums", false, true, "New setting."},
{"export_merge_tree_part_max_bytes_per_file", 0, 0, "New setting."},
{"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."},
// {"cluster_table_function_split_granularity", "file", "file", "New setting."},
// {"cluster_table_function_buckets_batch_size", 0, 0, "New setting."},
{"export_merge_tree_part_throw_on_pending_mutations", true, true, "New setting."},
{"export_merge_tree_part_throw_on_pending_patch_parts", true, true, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.8",
{
Expand Down
56 changes: 56 additions & 0 deletions src/Disks/DiskObjectStorage/ObjectStorages/IObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
#include <Common/Exception.h>
#include <Common/ObjectStorageKeyGenerator.h>
#include <IO/WriteBufferFromString.h>
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>

#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <Poco/JSON/JSONException.h>


namespace DB
Expand Down Expand Up @@ -102,4 +107,55 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings
return write_settings;
}

RelativePathWithMetadata::RelativePathWithMetadata(const DataFileInfo & info, std::optional<ObjectMetadata> metadata_)
: metadata(std::move(metadata_))
{
relative_path = info.file_path;
file_meta_info = info.file_meta_info;
}

RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task)
{
Poco::JSON::Parser parser;
try
{
auto json = parser.parse(task).extract<Poco::JSON::Object::Ptr>();
if (!json)
return;

is_valid = true;

if (json->has("file_path"))
file_path = json->getValue<std::string>("file_path");
if (json->has("retry_after_us"))
retry_after_us = json->getValue<size_t>("retry_after_us");
if (json->has("meta_info"))
file_meta_info = std::make_shared<DataFileMetaInfo>(json->getObject("meta_info"));
}
catch (const Poco::JSON::JSONException &)
{ /// Not a JSON
return;
}
catch (const Poco::SyntaxException &)
{ /// Not a JSON
return;
}
}

std::string RelativePathWithMetadata::CommandInTaskResponse::toString() const
{
Poco::JSON::Object json;
if (file_path.has_value())
json.set("file_path", file_path.value());
if (retry_after_us.has_value())
json.set("retry_after_us", retry_after_us.value());
if (file_meta_info.has_value())
json.set("meta_info", file_meta_info.value()->toJson());

std::ostringstream oss;
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
return oss.str();
}

}
Loading
Loading