Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions be/src/core/column/column_variant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1899,6 +1899,96 @@ void ColumnVariant::serialize_one_row_to_json_format(
output.write_char('}');
}

void ColumnVariant::serialize_one_row_flattened_to_string(
int64_t row, BufferWritable& output, const DataTypeSerDe::FormatOptions& options) const {
// Root-visible rows hold the whole value as a single scalar/array leaf; emit as-is.
if (is_visible_root_value(row)) {
subcolumns.get_root()->data.serialize_text_json(row, output, options);
return;
}

// Sub-column / sparse / doc-snapshot channels all key by dot-paths and are disjoint
// per row. Union their live paths, sort for determinism, emit "path":value for each.
std::vector<std::string> sorted_paths;
std::unordered_map<std::string, const Subcolumn*> subcolumn_by_path;
std::unordered_map<std::string, size_t> sparse_index_by_path;
std::unordered_map<std::string, size_t> doc_index_by_path;
sorted_paths.reserve(subcolumns.size());

for (const auto& subcolumn : get_subcolumns()) {
if (subcolumn->data.is_root) {
continue;
}
if (subcolumn->data.is_empty_nested(row)) {
continue;
}
if (subcolumn->data.is_null_at(row)) {
continue;
}
sorted_paths.emplace_back(subcolumn->path.get_path());
subcolumn_by_path.emplace(sorted_paths.back(), &subcolumn->data);
}

const auto& sparse_map = assert_cast<const ColumnMap&>(*serialized_sparse_column);
const auto& sparse_offsets = sparse_map.get_offsets();
const auto [sparse_paths_col, sparse_values_col] = get_sparse_data_paths_and_values();
for (size_t i = sparse_offsets[static_cast<ssize_t>(row) - 1];
i < sparse_offsets[static_cast<ssize_t>(row)]; ++i) {
auto ref = sparse_paths_col->get_data_at(i);
std::string path(ref.data, ref.size);
sparse_index_by_path.emplace(path, i);
sorted_paths.emplace_back(std::move(path));
}

const auto& doc_map = assert_cast<const ColumnMap&>(*serialized_doc_value_column);
const auto& doc_offsets = doc_map.get_offsets();
const auto& doc_keys = assert_cast<const ColumnString&>(doc_map.get_keys());
const auto& doc_values = assert_cast<const ColumnString&>(doc_map.get_values());
for (size_t i = doc_offsets[static_cast<ssize_t>(row) - 1];
i < doc_offsets[static_cast<ssize_t>(row)]; ++i) {
auto ref = doc_keys.get_data_at(i);
std::string path(ref.data, ref.size);
doc_index_by_path.emplace(path, i);
sorted_paths.emplace_back(std::move(path));
}

std::sort(sorted_paths.begin(), sorted_paths.end());

output.write_char('{');
DataTypeSerDe::FormatOptions opts = options;
opts.escape_char = '\\';
bool first = true;
for (const auto& path : sorted_paths) {
if (!first) {
output.write_char(',');
}
first = false;
output.write_json_string(path);
output.write_c_string(":");
if (auto sub_it = subcolumn_by_path.find(path); sub_it != subcolumn_by_path.end()) {
sub_it->second->serialize_text_json(row, output, opts);
} else if (auto sparse_it = sparse_index_by_path.find(path);
sparse_it != sparse_index_by_path.end()) {
Subcolumn tmp_subcolumn(0, true);
tmp_subcolumn.deserialize_from_binary_column(sparse_values_col, sparse_it->second);
tmp_subcolumn.serialize_text_json(0, output, opts);
} else if (auto doc_it = doc_index_by_path.find(path); doc_it != doc_index_by_path.end()) {
// Doc-snapshot values share the sparse binary encoding — deserialize
// before re-serializing as JSON text.
Subcolumn tmp_subcolumn(0, true);
tmp_subcolumn.deserialize_from_binary_column(&doc_values,
static_cast<ssize_t>(doc_it->second));
tmp_subcolumn.serialize_text_json(0, output, opts);
} else {
// Invariant: sorted_paths is the union of the three maps' keys.
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"variant_flatten: path '{}' at row {} missing from all channels",
path, row);
}
}
output.write_char('}');
}

size_t ColumnVariant::Subcolumn::get_non_null_value_size() const {
size_t res = 0;
for (const auto& part : data) {
Expand Down
5 changes: 5 additions & 0 deletions be/src/core/column/column_variant.h
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,11 @@ class ColumnVariant final : public COWHelper<IColumn, ColumnVariant> {
void serialize_one_row_to_json_format(int64_t row, BufferWritable& output, bool* is_null,
const DataTypeSerDe::FormatOptions& options) const;

// Serialize one row as a flat JSON object keyed by dot-joined paths.
// Example: {"a":{"b":2}} -> {"a.b":2}. Arrays stay opaque (keep-arrays).
void serialize_one_row_flattened_to_string(int64_t row, BufferWritable& output,
const DataTypeSerDe::FormatOptions& options) const;

void serialize_from_doc_value_to_json_format(int64_t row, BufferWritable& output,
bool* is_null) const;

Expand Down
81 changes: 81 additions & 0 deletions be/src/exprs/function/function_variant_flatten.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "core/assert_cast.h"
#include "core/column/column_string.h"
#include "core/column/column_variant.h"
#include "core/data_type/data_type_string.h"
#include "core/string_buffer.hpp"
#include "exprs/function/simple_function_factory.h"

namespace doris {
class FunctionContext;
} // namespace doris

namespace doris {

// variant_flatten(v VARIANT) -> STRING
// Flattens a nested Variant value into a single-level JSON object whose keys
// are the dot-joined paths to each leaf (NiFi FlattenJson "keep-arrays" mode):
// {"a":{"b":2}} -> {"a.b":2}
// {"a":{"b":{"c":3}}} -> {"a.b.c":3}
// {"a":[{"b":1}]} -> {"a":[{"b":1}]} // arrays are opaque leaves
//
// Returns STRING (not VARIANT) so the result survives being written back to a
// Variant column without being re-structured. Nullability propagates from the
// input via the framework's default null-handling.
//
// The heavy lifting lives in ColumnVariant::serialize_one_row_flattened_to_string,
// which walks the variant's typed sub-columns, sparse paths and doc-snapshot paths
// directly — no reparse. Array-of-objects inputs land as a single typed sub-column
// holding the whole array, so the flat walk naturally preserves keep-arrays
// semantics for them.
class FunctionVariantFlatten : public IFunction {
public:
static constexpr auto name = "variant_flatten";
static FunctionPtr create() { return std::make_shared<FunctionVariantFlatten>(); }

String get_name() const override { return name; }

size_t get_number_of_arguments() const override { return 1; }

DataTypePtr get_return_type_impl(const DataTypes& /*arguments*/) const override {
return std::make_shared<DataTypeString>();
}

Status execute_impl(FunctionContext* /*context*/, Block& block, const ColumnNumbers& arguments,
uint32_t result, size_t input_rows_count) const override {
const auto& variant_col =
assert_cast<const ColumnVariant&>(*block.get_by_position(arguments[0]).column);
auto result_col = ColumnString::create();
VectorBufferWriter writer(*result_col);
const DataTypeSerDe::FormatOptions fopts {};
for (size_t r = 0; r < input_rows_count; ++r) {
variant_col.serialize_one_row_flattened_to_string(static_cast<int64_t>(r), writer,
fopts);
writer.commit();
}
block.replace_by_position(result, std::move(result_col));
return Status::OK();
}
};

void register_function_variant_flatten(SimpleFunctionFactory& factory) {
factory.register_function<FunctionVariantFlatten>();
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/exprs/function/simple_function_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ void register_function_dict_get_many(SimpleFunctionFactory& factory);
void register_function_ai(SimpleFunctionFactory& factory);
void register_function_score(SimpleFunctionFactory& factory);
void register_function_variant_type(SimpleFunctionFactory& factory);
void register_function_variant_flatten(SimpleFunctionFactory& factory);
void register_function_binary(SimpleFunctionFactory& factory);
void register_function_soundex(SimpleFunctionFactory& factory);

Expand Down Expand Up @@ -363,6 +364,7 @@ class SimpleFunctionFactory {
register_function_throw_exception(instance);
#endif
register_function_variant_type(instance);
register_function_variant_flatten(instance);
});
return instance;
}
Expand Down
Loading
Loading