Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
4dd1432
Make dgraph import work over the internet
mangalaman93 Jul 10, 2025
a469b07
Make dgraph import work over the internet
mangalaman93 Jul 10, 2025
764d67f
added comments
harshil-goel Aug 7, 2025
bb12dbe
added comments
harshil-goel Aug 7, 2025
7788177
added comments
harshil-goel Aug 7, 2025
1454366
added comments
harshil-goel Aug 7, 2025
6d5d261
added comments
harshil-goel Aug 7, 2025
d75eed2
added comments
harshil-goel Aug 7, 2025
a605805
added comments
harshil-goel Aug 7, 2025
4c2eef5
added comments
harshil-goel Aug 7, 2025
3268108
added comments
harshil-goel Aug 7, 2025
7fcd4a5
added comments
harshil-goel Aug 7, 2025
a833afa
added comments
harshil-goel Aug 7, 2025
eef5388
added comments
harshil-goel Aug 7, 2025
9ada5b4
added comments
harshil-goel Aug 11, 2025
49a1330
added comments
harshil-goel Aug 11, 2025
60cc930
added comments
harshil-goel Aug 11, 2025
bfe109b
added comments
harshil-goel Aug 11, 2025
fca0f95
added comments
harshil-goel Aug 11, 2025
4bfcb92
added comments
harshil-goel Aug 11, 2025
c317e28
added comments
harshil-goel Aug 11, 2025
9a7f04e
added comments
harshil-goel Aug 11, 2025
8ec94ea
added comments
harshil-goel Aug 11, 2025
4adc364
added comments
harshil-goel Aug 11, 2025
89937a5
added comments
harshil-goel Aug 11, 2025
967df95
added comments
harshil-goel Aug 11, 2025
8c0c9c3
added comments
harshil-goel Aug 11, 2025
9f6a61f
added comments
harshil-goel Aug 12, 2025
5e64803
added comments
harshil-goel Aug 12, 2025
606034a
added comments
harshil-goel Aug 12, 2025
c337276
added comments
harshil-goel Aug 12, 2025
e718dc3
added comments
harshil-goel Aug 12, 2025
a6f7eb2
added comments
harshil-goel Aug 12, 2025
d9a2b7e
added comments
harshil-goel Aug 12, 2025
d65aa94
added comments
harshil-goel Aug 12, 2025
10c7da2
added comments
harshil-goel Aug 12, 2025
c5ec8a2
added comments
harshil-goel Aug 13, 2025
69bd4c8
added comments
harshil-goel Aug 13, 2025
86d58df
added comments
harshil-goel Aug 13, 2025
8c114a7
added comments
harshil-goel Aug 13, 2025
d2f6951
added comments
harshil-goel Aug 13, 2025
61d36e2
added comments
harshil-goel Aug 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci-dgraph-integration2-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
dgraph-integration2-tests:
if: github.event.pull_request.draft == false
runs-on: warp-ubuntu-latest-x64-4x
timeout-minutes: 30
timeout-minutes: 60
steps:
- uses: actions/checkout@v5
with:
Expand All @@ -48,7 +48,7 @@ jobs:
# move the binary
cp dgraph/dgraph ~/go/bin/dgraph
# run the tests
go test -v -timeout=30m -failfast -tags=integration2 ./...
go test -v -timeout=60m -failfast -tags=integration2 ./...
# clean up docker containers after test execution
go clean -testcache
# sleep
Expand Down
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
bzip2=1.0.8-5+b1 \
git=1:2.39.5-0+deb12u2 \
&& rm -rf /var/lib/apt/lists/*
ARG TARGETARCH=amd64
ARG TARGETOS=linux
WORKDIR /go/src/repo
COPY go.mod go.sum ./
RUN go mod download && go mod verify
COPY . .
RUN CGO_ENABLED=0 make
RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} make

###################### Stage II ######################
FROM ubuntu:24.04
Expand Down
45 changes: 33 additions & 12 deletions dgraph/cmd/dgraphimport/import_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"math"
"os"
"path/filepath"
"sync"

"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/dgo/v250"
Expand Down Expand Up @@ -67,10 +68,13 @@ func initiateSnapshotStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.
// subdirectories named with numeric group IDs.
func streamSnapshot(ctx context.Context, dc apiv2.DgraphClient, baseDir string, groups []uint32) error {
glog.Infof("[import] Starting to stream snapshot from directory: %s", baseDir)
var m sync.Mutex

errG, errGrpCtx := errgroup.WithContext(ctx)
for _, group := range groups {
errG.Go(func() error {
m.Lock()
defer m.Unlock()
pDir := filepath.Join(baseDir, fmt.Sprintf("%d", group-1), "p")
if _, err := os.Stat(pDir); err != nil {
return fmt.Errorf("p directory does not exist for group [%d]: [%s]", group, pDir)
Expand Down Expand Up @@ -127,23 +131,18 @@ func streamSnapshotForGroup(ctx context.Context, dc apiv2.DgraphClient, pdir str
if err != nil {
return fmt.Errorf("failed to start external snapshot stream for group %d: %w", groupId, err)
}

defer func() {
if _, err := out.CloseAndRecv(); err != nil {
glog.Errorf("failed to close the stream for group [%v]: %v", groupId, err)
}

glog.Infof("[import] Group [%v]: Received ACK ", groupId)
_ = out.CloseSend()
}()

// Open the BadgerDB instance at the specified directory
opt := badger.DefaultOptions(pdir)
opt.ReadOnly = true
ps, err := badger.OpenManaged(opt)
if err != nil {
glog.Errorf("failed to open BadgerDB at [%s]: %v", pdir, err)
return fmt.Errorf("failed to open BadgerDB at [%v]: %v", pdir, err)
}

defer func() {
if err := ps.Close(); err != nil {
glog.Warningf("[import] Error closing BadgerDB: %v", err)
Expand All @@ -154,32 +153,41 @@ func streamSnapshotForGroup(ctx context.Context, dc apiv2.DgraphClient, pdir str
glog.Infof("[import] Sending request for streaming external snapshot for group ID [%v]", groupId)
groupReq := &apiv2.StreamExtSnapshotRequest{GroupId: groupId}
if err := out.Send(groupReq); err != nil {
return fmt.Errorf("failed to send request for streaming external snapshot for group ID [%v] to the server: %w",
groupId, err)
return fmt.Errorf("failed to send request for group ID [%v] to the server: %w", groupId, err)
}
if _, err := out.Recv(); err != nil {
return fmt.Errorf("failed to receive response for group ID [%v] from the server: %w", groupId, err)
}
glog.Infof("[import] Group [%v]: Received ACK for sending group request", groupId)

// Configure and start the BadgerDB stream
glog.Infof("[import] Starting BadgerDB stream for group [%v]", groupId)

if err := streamBadger(ctx, ps, out, groupId); err != nil {
return fmt.Errorf("badger streaming failed for group [%v]: %v", groupId, err)
}

return nil
}

// streamBadger runs a BadgerDB stream to send key-value pairs to the specified group.
// It creates a new stream at the maximum sequence number and sends the data to the specified group.
// It also sends a final 'done' signal to mark completion.
func streamBadger(ctx context.Context, ps *badger.DB, out apiv2.Dgraph_StreamExtSnapshotClient, groupId uint32) error {
count := 0
stream := ps.NewStreamAt(math.MaxUint64)
stream.LogPrefix = "[import] Sending external snapshot to group [" + fmt.Sprintf("%d", groupId) + "]"
stream.KeyToList = nil
stream.Send = func(buf *z.Buffer) error {
p := &apiv2.StreamPacket{Data: buf.Bytes()}
count += 1
fmt.Println("Packets sent:", len(buf.Bytes()))
if err := out.Send(&apiv2.StreamExtSnapshotRequest{Pkt: p}); err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("failed to send data chunk: %w", err)
}
if _, err := out.Recv(); err != nil {
return fmt.Errorf("failed to receive response for group ID [%v] from the server: %w", groupId, err)
}
glog.Infof("[import] Group [%v]: Received ACK for sending data chunk", groupId)

return nil
}

Expand All @@ -196,5 +204,18 @@ func streamBadger(ctx context.Context, ps *badger.DB, out apiv2.Dgraph_StreamExt
return fmt.Errorf("failed to send 'done' signal for group [%d]: %w", groupId, err)
}

return nil
fmt.Println("Packets sent:", count)

var bytes *apiv2.StreamExtSnapshotResponse
var err error
for {
if bytes, err = out.Recv(); err != nil {
return fmt.Errorf("failed to receive response for group ID [%v] from the server: %w", groupId, err)
} else {
glog.Infof("[import] Group [%v]: Received ACK for sending completion signal", groupId)
if bytes.Finish {
return nil
}
}
}
}
17 changes: 10 additions & 7 deletions dgraph/cmd/dgraphimport/import_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//go:build integration
//go:build integration2

/*
* SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
Expand Down Expand Up @@ -75,7 +75,8 @@ func TestDrainModeAfterStartSnapshotStream(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conf := dgraphtest.NewClusterConfig().WithNumAlphas(tt.numAlphas).WithNumZeros(tt.numZeros).WithReplicas(tt.replicas)
conf := dgraphtest.NewClusterConfig().WithNumAlphas(tt.numAlphas).
WithNumZeros(tt.numZeros).WithReplicas(tt.replicas)
c, err := dgraphtest.NewLocalCluster(conf)
require.NoError(t, err)
defer func() { c.Cleanup(t.Failed()) }()
Expand Down Expand Up @@ -268,14 +269,12 @@ func runImportTest(t *testing.T, tt testcase) {
require.NoError(t, targetCluster.StopAlpha(alphaID))
}
}

if tt.err != "" {
err := Import(context.Background(), connectionString, outDir)
require.Error(t, err)
require.ErrorContains(t, err, tt.err)
return
}

require.NoError(t, Import(context.Background(), connectionString, outDir))

for group, alphas := range alphaGroups {
Expand All @@ -287,7 +286,6 @@ func runImportTest(t *testing.T, tt testcase) {
}

require.NoError(t, targetCluster.HealthCheck(false))

t.Log("Import completed")

for i := 0; i < tt.targetAlphas; i++ {
Expand Down Expand Up @@ -330,7 +328,9 @@ func setupBulkCluster(t *testing.T, numAlphas int, encrypted bool) (*dgraphtest.
}

// setupTargetCluster creates and starts a cluster that will receive the imported data
func setupTargetCluster(t *testing.T, numAlphas, replicasFactor int) (*dgraphtest.LocalCluster, *dgraphapi.GrpcClient, func()) {
func setupTargetCluster(t *testing.T, numAlphas, replicasFactor int) (
*dgraphtest.LocalCluster, *dgraphapi.GrpcClient, func()) {

conf := dgraphtest.NewClusterConfig().
WithNumAlphas(numAlphas).
WithNumZeros(3).
Expand All @@ -343,6 +343,9 @@ func setupTargetCluster(t *testing.T, numAlphas, replicasFactor int) (*dgraphtes
gc, cleanup, err := cluster.Client()
require.NoError(t, err)

cluster.AssignTs(gc.Dgraph, 25000)
cluster.AssignUids(gc.Dgraph, 65536)

// Return cluster and client (cleanup will be handled by the caller)
return cluster, gc, cleanup
}
Expand All @@ -351,7 +354,7 @@ func setupTargetCluster(t *testing.T, numAlphas, replicasFactor int) (*dgraphtes
func verifyImportResults(t *testing.T, gc *dgraphapi.GrpcClient, downAlphas int) {
maxRetries := 1
if downAlphas > 0 {
maxRetries = 10
maxRetries = 5
}

retryDelay := 500 * time.Millisecond
Expand Down
12 changes: 12 additions & 0 deletions dgraph/cmd/dgraphimport/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func init() {

flag := ImportCmd.Cmd.Flags()
flag.StringP("files", "f", "", "Location of *.rdf(.gz) or *.json(.gz) file(s) to load.")
flag.StringP("snapshot-dir", "p", "", "Location of p directory")
flag.StringP("schema", "s", "", "Location of DQL schema file.")
flag.StringP("graphql_schema", "g", "", "Location of the GraphQL schema file.")
flag.StringP("graphql-schema", "", "", "Location of the GraphQL schema file.")
Expand Down Expand Up @@ -72,6 +73,17 @@ func run() {
os.Exit(1)
}

// if snapshot p directory is already provided, there is no need to run bulk loader
if ImportCmd.Conf.GetString("snapshot-dir") != "" {
connStr := ImportCmd.Conf.GetString("conn-str")
snapshotDir := ImportCmd.Conf.GetString("snapshot-dir")
if err := Import(context.Background(), connStr, snapshotDir); err != nil {
fmt.Println("Failed to import data:", err)
os.Exit(1)
}
return
}

cacheSize := 64 << 20 // These are the default values. User can overwrite them using --badger.
cacheDefaults := fmt.Sprintf("indexcachesize=%d; blockcachesize=%d; ",
(70*cacheSize)/100, (30*cacheSize)/100)
Expand Down
2 changes: 1 addition & 1 deletion dgraphtest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func NewClusterConfig() ClusterConfig {
numAlphas: 1,
numZeros: 1,
replicas: 1,
verbosity: 2,
verbosity: 3,
version: localVersion,
volumes: map[string]string{DefaultBackupDir: defaultBackupVol, DefaultExportDir: defaultExportVol},
refillInterval: 20 * time.Second,
Expand Down
6 changes: 5 additions & 1 deletion dgraphtest/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,12 @@ func (c *LocalCluster) BulkLoad(opts BulkOpts) error {
args = append(args, "-g", strings.Join(opts.GQLSchemaFiles, ","))
}

dgraphCmdPath := os.Getenv("DGRAPH_CMD_PATH")
if dgraphCmdPath == "" {
dgraphCmdPath = filepath.Join(c.tempBinDir, "dgraph")
}
log.Printf("[INFO] running bulk loader with args: [%v]", strings.Join(args, " "))
cmd := exec.Command(filepath.Join(c.tempBinDir, "dgraph"), args...)
cmd := exec.Command(dgraphCmdPath, args...)
if out, err := cmd.CombinedOutput(); err != nil {
return errors.Wrapf(err, "error running bulk loader: %v", string(out))
} else {
Expand Down
34 changes: 34 additions & 0 deletions dgraphtest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,40 @@ func (c *LocalCluster) AlphasLogs() ([]string, error) {
return alphasLogs, nil
}

func (c *LocalCluster) AssignTs(_ *dgo.Dgraph, num uint64) error {
if len(c.zeros) == 0 {
return errors.New("no zero running")
}

baseURL, err := c.zeros[0].assignURL(c)
if err != nil {
return err
}

url := fmt.Sprintf("%v?what=timestamps&num=%d", baseURL, num)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return errors.Wrapf(err, "error building req for endpoint [%v]", url)
}
body, err := dgraphapi.DoReq(req)
if err != nil {
return err
}
var data struct {
Errors []struct {
Message string
Code string
}
}
if err := json.Unmarshal(body, &data); err != nil {
return errors.Wrap(err, "error unmarshaling response")
}
if len(data.Errors) > 0 {
return fmt.Errorf("error received from zero: %v", data.Errors[0].Message)
}
return nil
}

// AssignUids talks to zero to assign the given number of uids
func (c *LocalCluster) AssignUids(_ *dgo.Dgraph, num uint64) error {
if len(c.zeros) == 0 {
Expand Down
13 changes: 8 additions & 5 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1829,18 +1829,21 @@ func (s *ServerV25) UpdateExtSnapshotStreamingState(ctx context.Context,

groups, err := worker.ProposeDrain(ctx, req)
if err != nil {
glog.Errorf("[import] failed to propose drain mode: %v", err)
return nil, err
}

resp := &apiv2.UpdateExtSnapshotStreamingStateResponse{Groups: groups}

return resp, nil
return &apiv2.UpdateExtSnapshotStreamingStateResponse{Groups: groups}, nil
}

func (s *ServerV25) StreamExtSnapshot(stream apiv2.Dgraph_StreamExtSnapshotServer) error {
fmt.Println("STREAM EXT STNAPHOST CALLED")
defer x.ExtSnapshotStreamingState(false)

return worker.InStream(stream)
if err := worker.InStream(stream); err != nil {
glog.Errorf("[import] failed to stream external snapshot: %v", err)
return err
}
return nil
}

// CommitOrAbort commits or aborts a transaction.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/Masterminds/semver/v3 v3.4.0
github.com/blevesearch/bleve/v2 v2.5.3
github.com/dgraph-io/badger/v4 v4.8.0
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250619041351-4a519e53fb9d
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250709182152-32901102e0d0
github.com/dgraph-io/gqlgen v0.13.2
github.com/dgraph-io/gqlparser/v2 v2.2.2
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger/v4 v4.8.0 h1:JYph1ChBijCw8SLeybvPINizbDKWZ5n/GYbz2yhN/bs=
github.com/dgraph-io/badger/v4 v4.8.0/go.mod h1:U6on6e8k/RTbUWxqKR0MvugJuVmkxSNc79ap4917h4w=
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250619041351-4a519e53fb9d h1:9PLyvZY1Nih05g+2womk+kNnX3Gb20kx5BsK3foA5a8=
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250619041351-4a519e53fb9d/go.mod h1:gLr7uM+x/8PjSQJ4Ca9kfQF15uBzruDzRK3bnELt3vE=
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250709182152-32901102e0d0 h1:gvNB/M+LrjkX9c4QJCPdODVoxgMt3zlIBW9i8xVfoYo=
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250709182152-32901102e0d0/go.mod h1:gLr7uM+x/8PjSQJ4Ca9kfQF15uBzruDzRK3bnELt3vE=
github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM=
github.com/dgraph-io/gqlgen v0.13.2/go.mod h1:iCOrOv9lngN7KAo+jMgvUPVDlYHdf7qDwsTkQby2Sis=
github.com/dgraph-io/gqlparser/v2 v2.1.1/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU=
Expand Down
1 change: 1 addition & 0 deletions protos/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ regenerate: tidy-deps copy-protos check clean
@protoc \
--proto_path=/usr/local/include \
--proto_path=/usr/include \
--proto_path=/opt/homebrew/include/google/protobuf \
--proto_path=${PROTO_PATH} \
--go_out=pb --go-grpc_out=pb \
--go_opt=paths=source_relative \
Expand Down
2 changes: 1 addition & 1 deletion protos/depcheck.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ function CompareSemVer() {

function CheckProtobufIncludes() {
echo -n "Checking for directory /usr/include/google/protobuf or /usr/local/include/google/protobuf... "
if !([[ -d /usr/include/google/protobuf ]] || [[ -d /usr/local/include/google/protobuf ]]); then
if !([[ -d /usr/include/google/protobuf ]] || [[ -d /usr/local/include/google/protobuf ]] || [[ -d /opt/homebrew/include/google/protobuf ]]); then
echo "FAIL" >&2
echo "Missing protobuf headers in /usr/include/google/protobuf or /usr/local/include/google/protobuf:" \
"directory not found." >&2
Expand Down
16 changes: 12 additions & 4 deletions protos/patch_pb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,18 @@
# This patch script applies the necessary changes to pb.pb.go.
PB_GEN_FILE="./pb/pb.pb.go"

sed -i 's/SessionToken string/SessionToken Sensitive/' "${PB_GEN_FILE}"
sed -i 's/SecretKey string/SecretKey Sensitive/' "${PB_GEN_FILE}"
# Function to perform in-place sed that works on both macOS and Linux
sed_inplace() {
local tmpfile
tmpfile=$(mktemp)
sed "$1" "$2" >"${tmpfile}" && mv "${tmpfile}" "$2"
rm -f "${tmpfile}"
}

sed -i 's/GetSessionToken() string {/GetSessionToken() Sensitive {/' "${PB_GEN_FILE}"
sed -i 's/GetSecretKey() string/GetSecretKey() Sensitive/' "${PB_GEN_FILE}"
# Apply the replacements using the cross-platform function
sed_inplace 's/SessionToken string/SessionToken Sensitive/' "${PB_GEN_FILE}"
sed_inplace 's/SecretKey string/SecretKey Sensitive/' "${PB_GEN_FILE}"
sed_inplace 's/GetSessionToken() string {/GetSessionToken() Sensitive {/' "${PB_GEN_FILE}"
sed_inplace 's/GetSecretKey() string/GetSecretKey() Sensitive/' "${PB_GEN_FILE}"

echo "Patches applied successfully."
Loading