Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions src/core/linear_search_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ class LinearSearchMap : public absl::InlinedVector<std::pair<Key, Value>, N> {

Value& operator[](const Key& key);
const Value& operator[](const Key& key) const;

std::vector<Key> keys() const;
};

// Implementation
Expand Down Expand Up @@ -111,14 +109,4 @@ const Value& LinearSearchMap<Key, Value, N>::operator[](const Key& key) const {
return find(key)->second;
}

template <typename Key, typename Value, size_t N>
std::vector<Key> LinearSearchMap<Key, Value, N>::keys() const {
std::vector<Key> keys;
keys.reserve(this->size());
for (const auto& pair : *this) {
keys.push_back(pair.first);
}
return keys;
}

} // namespace dfly
5 changes: 3 additions & 2 deletions src/server/search/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ if (NOT WITH_SEARCH)
return()
endif()

add_library(dfly_search_server aggregator.cc doc_accessors.cc doc_index.cc search_family.cc)
add_library(dfly_search_server aggregator.cc doc_accessors.cc doc_index.cc search_family.cc index_join.cc)
target_link_libraries(dfly_search_server dfly_transaction dragonfly_lib dfly_facade redis_lib jsonpath TRDP::jsoncons)


cxx_test(search_family_test dfly_test_lib LABELS DFLY)
cxx_test(aggregator_test dfly_test_lib LABELS DFLY)
cxx_test(index_join_test dfly_test_lib LABELS DFLY)


add_dependencies(check_dfly search_family_test aggregator_test)
add_dependencies(check_dfly search_family_test aggregator_test index_join_test)
145 changes: 145 additions & 0 deletions src/server/search/index_join.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2025, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#include "server/search/index_join.h"

namespace dfly::join {

namespace {

using KeyIndex = size_t;
using KeyIndexes = Vector<KeyIndex>;

// Joins joined_entries with new index entries using join_expressions.
// It uses hash joining algorithm to find matching entries.
std::vector<KeyIndexes> JoinWithNewIndex(
EntriesPerIndex indexes_entries, absl::Span<const KeyIndexes> joined_entries,
size_t new_index, // represented as index in indexes_entries
absl::Span<const JoinExpression> join_expressions) {
/* We fill join_map with values sets from joined entries.
In join_map we store {set of field values} to indexes in joined_entries that match this set of
field values. So, then we can go over new_index entries and match their values with
joined_entries using this.
TODO: use hash map for the smallest set (new_index or joined_entries) */
using ValuesSet = Vector<JoinableValue>;
using JoinEntriesIndexes = absl::InlinedVector<size_t, 1>;
absl::flat_hash_map<ValuesSet, JoinEntriesIndexes> join_map;
join_map.reserve(joined_entries.size());

// Now we need to initialize join_map with values of joined entries.
for (size_t i = 0; i < joined_entries.size(); ++i) {
const auto& joined_entry_keys = joined_entries[i];

ValuesSet values_set;
values_set.reserve(join_expressions.size());

// Go over all join expressions and get field values using foreign index and field.
for (const auto& join_expression : join_expressions) {
size_t index = join_expression.foreign_index;
size_t field_index = join_expression.foreign_field;

// Now we need to get value of this field from joined key in this index
DCHECK_LT(index, joined_entry_keys.size()) << "Join order broken, index out of range";
KeyIndex key_index = joined_entry_keys[index];
const JoinableValue& field_value = indexes_entries[index][key_index].second[field_index];

// Add value to the set
values_set.push_back(field_value);
}

// That means that this set of values corresponds to joined entry i
join_map[values_set].push_back(i);
}

std::vector<KeyIndexes> result;
result.reserve(join_map.size());

// Now we store all possible sets of values in joined_entries that match this set.
// We can iterate over new index and find entries with the same set of values.
const auto& new_index_entries = indexes_entries[new_index];
for (size_t i = 0; i < new_index_entries.size(); ++i) {
const auto& index_entries = new_index_entries[i].second;

ValuesSet values_set;
values_set.reserve(join_expressions.size());
// Go over all join expressions and get field values for this entry
for (const auto& join_expression : join_expressions) {
const JoinableValue& field_value = index_entries[join_expression.field];
values_set.push_back(field_value);
}

// Now we need to find this set in the join_map
auto it = join_map.find(values_set);
if (it == join_map.end()) {
continue;
}

// This entry in new index matches some joined entries,
// we need to go over all entries with the same set of values
// and add them to the result
for (size_t joined_entry_index : it->second) {
result.push_back(joined_entries[joined_entry_index]);
// Add new index entry to the joined entry
result.back().push_back(i);
}
}

return result;
}

} // anonymous namespace

Vector<Vector<Key>> JoinAllIndexes(EntriesPerIndex indexes_entries, IndexesJoinExpressions joins) {
if (indexes_entries.empty()) {
return {};
}

// Will used to initialize joined entries
const auto& first_index_entries = indexes_entries[0];

/* Store current result of joins
Each entry is vector of indexes, that referce to one key in the index
For example, {1, 0, 4} means that key with index 1 in the first index,
key with index 0 in the second index and key with index 4 in the third index were joined to
single entry. */
std::vector<KeyIndexes> joined_entries(first_index_entries.size(), KeyIndexes(1));

// At the first step all keys from the first index are joined
for (size_t i = 0; i < first_index_entries.size(); ++i) {
joined_entries[i][0] = i;
}

DCHECK(joins[0].empty()) << "Base index must be first and have no joins";

/* Now we need to iterate over all indexes and the joins
Using joins for the new index, we will find matching entries in the current result
(joined_entries) with the entries in the new index. */
for (size_t i = 1; i < indexes_entries.size(); ++i) {
joined_entries = JoinWithNewIndex(indexes_entries, joined_entries, i, joins[i]);
}

const size_t result_size = joined_entries.size();
const size_t indexes_count = indexes_entries.size();
// Now we have joined entries, we need to build JoinResult
Vector<Vector<Key>> result(result_size, Vector<Key>(indexes_count));

for (size_t i = 0; i < result_size; ++i) {
auto& result_entry = result[i];

for (size_t index = 0; index < indexes_count; ++index) {
// Index of joined key in the current index
KeyIndex key_index = joined_entries[i][index];
// Find key by the key_index
const auto& key = indexes_entries[index][key_index].first;

// Add key to the result
// That means that this key from this index was joined
result_entry[index] = key;
}
}

return result;
}

} // namespace dfly::join
64 changes: 64 additions & 0 deletions src/server/search/index_join.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2025, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

#include <vector>

#include "base/logging.h"
#include "core/linear_search_map.h"
#include "core/search/base.h"
#include "server/tx_base.h"

namespace dfly::join {

template <typename T> using Vector = absl::InlinedVector<T, 4>;

/* Represents field value.
Same as search::SortableValue, but do not have monostate and stores string_view instead of
std::string. */
using JoinableValue = std::variant<double, std::string_view>;

/* Each index has its own set of fields used for joins.
Additionally, each index contains multiple keys/documents it has indexed, and each document
includes several fields.

For example:
JOIN index2 ON index2.field1 = other_index.field2 AND index2.field3 = other_index.field4

So, index2 uses field1 and field3 for joins. It also indexed docs key1, key2, key3:
EntriesPerIndex will store something like:
[{"key1", {"field1" : value, "field3" : value}},
{"key2", {"field1" : value, "field3" : value}},
{"key3", {"field1" : value, "field3" : value}}].
But to make join algorithm more efficient, we store it as raw vectors,
instead of field_name as string, we use indexes;
instead of key names we use shard id and doc id.
*/
using Key = std::pair<ShardId, search::DocId>;
using Entry = std::pair<Key, Vector<JoinableValue> /*fields values of this key*/>;
using EntriesPerIndex = absl::Span<const Vector<Entry> /*one index can store several keys*/>;

// Stores data for single join expression,
// e.g. index1.field1 = index2.field2:
// field - "field1", foreign_index - "index2", foreign_field - "field2"
struct JoinExpression {
size_t field; // field is represented as index in the Entry.second array
size_t foreign_index; // foreign_index is represented as index in the EntriesPerIndex array
size_t foreign_field; // foreign_field is too represented as index in the Entry.second array
};

using JoinExpressionsVec = Vector<JoinExpression>;

/* Each index can have several join expressions, e.g.:
JOIN index1 ON index1.field1 = other_index.field2 AND index1.field3 = other_index.field4
will result in:
{"index1", {{"field1", "other_index", "field2"}, {"field3", "other_index", "field4"}}} */
using IndexesJoinExpressions = absl::Span<const JoinExpressionsVec>;

/* Joins all indexes in indexes_map using join_expressions.
Join algorithm is used is hash join. */
Vector<Vector<Key>> JoinAllIndexes(EntriesPerIndex indexes_entries, IndexesJoinExpressions joins);

} // namespace dfly::join
Loading
Loading