Skip to content

Topn lazy materialize poc #50585

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/action-pr-title
Submodule action-pr-title updated 548 files
2 changes: 1 addition & 1 deletion .github/actions/get-workflow-origin
2 changes: 1 addition & 1 deletion .github/actions/setup-maven
Submodule setup-maven updated 1325 files
2 changes: 1 addition & 1 deletion be/src/apache-orc
2 changes: 1 addition & 1 deletion be/src/clucene
1 change: 1 addition & 0 deletions be/src/common/consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const std::string CSV_WITH_NAMES_AND_TYPES = "csv_with_names_and_types";
const std::string BLOCK_TEMP_COLUMN_PREFIX = "__TEMP__";
const std::string BLOCK_TEMP_COLUMN_SCANNER_FILTERED = "__TEMP__scanner_filtered";
const std::string ROWID_COL = "__DORIS_ROWID_COL__";
const std::string GLOBAL_ROWID_COL = "__DORIS_GLOBAL_ROWID_COL__";
const std::string ROW_STORE_COL = "__DORIS_ROW_STORE_COL__";
const std::string DYNAMIC_COLUMN_NAME = "__DORIS_DYNAMIC_COL__";
const std::string PARTIAL_UPDATE_AUTO_INC_COL = "__PARTIAL_UPDATE_AUTO_INC_COLUMN__";
Expand Down
361 changes: 352 additions & 9 deletions be/src/exec/rowid_fetcher.cpp

Large diffs are not rendered by default.

47 changes: 47 additions & 0 deletions be/src/exec/rowid_fetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "common/status.h"
#include "exec/tablet_info.h" // DorisNodesInfo
#include "olap/id_manager.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type.h"

Expand All @@ -36,6 +37,11 @@ class DorisNodesInfo;
class RuntimeState;
class TupleDescriptor;

struct FileMapping;
struct IteratorKey;
struct IteratorItem;
struct HashOfIteratorKey;

namespace vectorized {
template <typename T>
class ColumnStr;
Expand Down Expand Up @@ -70,9 +76,50 @@ class RowIDFetcher {
FetchOption _fetch_option;
};

struct RowStoreReadStruct {
RowStoreReadStruct(std::string& buffer) : row_store_buffer(buffer) {};
std::string& row_store_buffer;
vectorized::DataTypeSerDeSPtrs serdes;
std::unordered_map<uint32_t, uint32_t> col_uid_to_idx;
std::vector<std::string> default_values;
};

class RowIdStorageReader {
public:
static Status read_by_rowids(const PMultiGetRequest& request, PMultiGetResponse* response);
static Status read_by_rowids(const PMultiGetRequestV2& request, PMultiGetResponseV2* response);

private:
static Status read_doris_format_row(
const std::shared_ptr<IdFileMap>& id_file_map,
const std::shared_ptr<FileMapping>& file_mapping, int64_t row_id,
std::vector<SlotDescriptor>& slots, const TabletSchema& full_read_schema,
RowStoreReadStruct& row_store_read_struct, OlapReaderStatistics& stats,
int64_t* acquire_tablet_ms, int64_t* acquire_rowsets_ms, int64_t* acquire_segments_ms,
int64_t* lookup_row_data_ms,
std::unordered_map<IteratorKey, IteratorItem, HashOfIteratorKey>& iterator_map,
vectorized::Block& result_block);

static Status read_batch_doris_format_row(
const PRequestBlockDesc& request_block_desc, std::shared_ptr<IdFileMap> id_file_map,
const TUniqueId& query_id, vectorized::Block& result_block, OlapReaderStatistics& stats,
int64_t* acquire_tablet_ms, int64_t* acquire_rowsets_ms, int64_t* acquire_segments_ms,
int64_t* lookup_row_data_ms);

static Status read_batch_external_row(const PRequestBlockDesc& request_block_desc,
std::shared_ptr<IdFileMap> id_file_map,
std::shared_ptr<FileMapping> first_file_mapping,
const TUniqueId& query_id,
vectorized::Block& result_block, int64_t* init_reader_ms,
int64_t* get_block_ms);
};

template <typename Func>
auto scope_timer_run(Func fn, int64_t* cost) -> decltype(fn()) {
MonotonicStopWatch watch;
watch.start();
auto res = fn();
*cost += watch.elapsed_time() / 1000 / 1000;
return res;
}
} // namespace doris
3 changes: 2 additions & 1 deletion be/src/olap/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/olap")
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/olap")

file(GLOB_RECURSE SRC_FILES CONFIGURE_DEPENDS *.cpp)
add_library(Olap STATIC ${SRC_FILES})
add_library(Olap STATIC ${SRC_FILES}
id_manager.h)

if (NOT USE_MEM_TRACKER)
target_compile_options(Olap PRIVATE -Wno-unused-lambda-capture)
Expand Down
263 changes: 263 additions & 0 deletions be/src/olap/id_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <butil/macros.h>
#include <gen_cpp/BackendService_types.h>
#include <gen_cpp/Types_types.h>
#include <stddef.h>
#include <stdint.h>

#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <shared_mutex>
#include <string>
#include <string_view>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include "common/status.h"
#include "olap/olap_common.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"

namespace doris {

enum class FileMappingType {
INTERNAL, // for doris format file {tablet_id}{rowset_id}{segment_id}
EXTERNAL, // for external table.
};

struct InternalFileMappingInfo {
int64_t tablet_id;
RowsetId rowset_id;
uint32_t segment_id;

std::string to_string() const {
std::string value;
value.resize(sizeof(tablet_id) + sizeof(rowset_id) + sizeof(segment_id));
auto* ptr = value.data();

memcpy(ptr, &tablet_id, sizeof(tablet_id));
ptr += sizeof(tablet_id);
memcpy(ptr, &rowset_id, sizeof(rowset_id));
ptr += sizeof(rowset_id);
memcpy(ptr, &segment_id, sizeof(segment_id));
return value;
}
};

struct ExternalFileMappingInfo {
/* By recording the plan_node_id in fileMapping, the TFileScanRangeParams used in the scan phase can be found
* from QueryContext according to the plan_node_id. Because there are some important information in
* TFileScanRangeParams (needed when creating hdfs/s3 reader):
* 8: optional THdfsParams hdfs_params;
* 9: optional map<string, string> properties;
*/
int plan_node_id;

/*
* Record TFileRangeDesc external_scan_range_desc in fileMapping, usage:
* 1. If the file belongs to a partition, columns_from_path_keys and columns_from_path in TFileRangeDesc are needed when materializing the partition column
* 2. path, file_type, modification_time,compress_type .... used to read the file
* 3. TFileFormatType can distinguish whether it is iceberg/hive/hudi/paimon
*/
TFileRangeDesc scan_range_desc;
bool enable_file_meta_cache;

ExternalFileMappingInfo(int plan_node_id, const TFileRangeDesc& scan_range,
bool file_meta_cache)
: plan_node_id(plan_node_id),
scan_range_desc(scan_range),
enable_file_meta_cache(file_meta_cache) {}

std::string to_string() const {
std::string value;
value.resize(scan_range_desc.path.size() + sizeof(plan_node_id) +
sizeof(scan_range_desc.start_offset));
auto* ptr = value.data();

memcpy(ptr, &plan_node_id, sizeof(plan_node_id));
ptr += sizeof(plan_node_id);
memcpy(ptr, &scan_range_desc.start_offset, sizeof(scan_range_desc.start_offset));
ptr += sizeof(scan_range_desc.start_offset);
memcpy(ptr, scan_range_desc.path.data(), scan_range_desc.path.size());
return value;
}
};

struct FileMapping {
ENABLE_FACTORY_CREATOR(FileMapping);

FileMappingType type;
std::variant<InternalFileMappingInfo, ExternalFileMappingInfo> value;

FileMapping(int64_t tablet_id, RowsetId rowset_id, uint32_t segment_id)
: type(FileMappingType::INTERNAL),
value(std::in_place_type<InternalFileMappingInfo>, tablet_id, rowset_id, segment_id) {
}

FileMapping(int plan_node_id, const TFileRangeDesc& scan_range, bool enable_file_meta_cache)
: type(FileMappingType::EXTERNAL),
value(std::in_place_type<ExternalFileMappingInfo>, plan_node_id, scan_range,
enable_file_meta_cache) {}

std::tuple<int64_t, RowsetId, uint32_t> get_doris_format_info() const {
DCHECK(type == FileMappingType::INTERNAL);
auto info = std::get<InternalFileMappingInfo>(value);
return std::make_tuple(info.tablet_id, info.rowset_id, info.segment_id);
}

ExternalFileMappingInfo& get_external_file_info() {
DCHECK(type == FileMappingType::EXTERNAL);
return std::get<ExternalFileMappingInfo>(value);
}

static std::string file_mapping_info_to_string(
const std::variant<InternalFileMappingInfo, ExternalFileMappingInfo>& info) {
return std::visit(
[](const auto& info) -> std::string {
using T = std::decay_t<decltype(info)>;

if constexpr (std::is_same_v<T, InternalFileMappingInfo>) {
return info.to_string();

} else if constexpr (std::is_same_v<T, ExternalFileMappingInfo>) {
return info.to_string();
}
},
info);
}

std::string file_mapping_info_to_string() { return file_mapping_info_to_string(value); }
};

class IdFileMap {
public:
IdFileMap(uint64_t expired_timestamp) : delayed_expired_timestamp(expired_timestamp) {}

std::shared_ptr<FileMapping> get_file_mapping(uint32_t id) {
std::shared_lock lock(_mtx);
auto it = _id_map.find(id);
if (it == _id_map.end()) {
return nullptr;
}
return it->second;
}

uint32 get_file_mapping_id(const std::shared_ptr<FileMapping>& mapping) {
DCHECK(mapping.get() != nullptr);
std::unique_lock lock(_mtx);
auto value = mapping->file_mapping_info_to_string();
auto it = _mapping_to_id.find(value);
if (it != _mapping_to_id.end()) {
return it->second;
}
_id_map[_init_id++] = mapping;
_mapping_to_id[value] = _init_id - 1;

return _init_id - 1;
}

void add_temp_rowset(const RowsetSharedPtr& rowset) {
std::unique_lock lock(_mtx);
_temp_rowset_maps[{rowset->rowset_meta()->tablet_id(), rowset->rowset_id()}] = rowset;
}

RowsetSharedPtr get_temp_rowset(const int64_t tablet_id, const RowsetId& rowset_id) {
std::shared_lock lock(_mtx);
auto it = _temp_rowset_maps.find({tablet_id, rowset_id});
if (it == _temp_rowset_maps.end()) {
return nullptr;
}
return it->second;
}

int64_t get_delayed_expired_timestamp() { return delayed_expired_timestamp; }

private:
std::shared_mutex _mtx;
uint32_t _init_id = 0;
std::unordered_map<std::string, uint32_t> _mapping_to_id;
std::unordered_map<uint32_t, std::shared_ptr<FileMapping>> _id_map;

// use in Doris Format to keep temp rowsets, preventing them from being deleted by compaction
std::unordered_map<std::pair<int64_t, RowsetId>, RowsetSharedPtr> _temp_rowset_maps;
uint64_t delayed_expired_timestamp = 0;
};

class IdManager {
public:
static constexpr uint8_t ID_VERSION = 0;

IdManager() = default;

~IdManager() {
std::unique_lock lock(_query_to_id_file_map_mtx);
_query_to_id_file_map.clear();
}

std::shared_ptr<IdFileMap> add_id_file_map(const UniqueId& query_id, int timeout) {
std::unique_lock lock(_query_to_id_file_map_mtx);
auto it = _query_to_id_file_map.find(query_id);
if (it == _query_to_id_file_map.end()) {
auto id_file_map = std::make_shared<IdFileMap>(UnixSeconds() + timeout);
_query_to_id_file_map[query_id] = id_file_map;
return id_file_map;
}
return it->second;
}

void gc_expired_id_file_map(int64_t now) {
std::unique_lock lock(_query_to_id_file_map_mtx);
for (auto it = _query_to_id_file_map.begin(); it != _query_to_id_file_map.end();) {
if (it->second->get_delayed_expired_timestamp() <= now) {
it = _query_to_id_file_map.erase(it);
} else {
++it;
}
}
}

void remove_id_file_map(const UniqueId& query_id) {
std::unique_lock lock(_query_to_id_file_map_mtx);
_query_to_id_file_map.erase(query_id);
}

std::shared_ptr<IdFileMap> get_id_file_map(const UniqueId& query_id) {
std::shared_lock lock(_query_to_id_file_map_mtx);
auto it = _query_to_id_file_map.find(query_id);
if (it == _query_to_id_file_map.end()) {
return nullptr;
}
return it->second;
}

private:
DISALLOW_COPY_AND_ASSIGN(IdManager);

phmap::flat_hash_map<UniqueId, std::shared_ptr<IdFileMap>> _query_to_id_file_map;
std::shared_mutex _query_to_id_file_map_mtx;
};

} // namespace doris
Loading
Loading