Import Large Databases

Import large databases to Data Bridge

Overview

The Data Bridge Import/Upload API defines operations that enable RMS customers to upload and import large databases (files greater than 5GB in size) to the cloud in multiple parts.

This process leverages Amazon Web Services (AWS) APIs to enable tenants to upload database artifacts (in BAK, DACPAC, or MDF format) to storage buckets on AWS in multiple parts. Once uploaded to the cloud, the tenant can use the Data Bridge Jobs API to import the database on a managed server instance on Data Bridge.

RMS recommends that you use multi-part upload for all data integration projects. Although this workflow is more complicated than a single-stream upload workflow, multi-part upload jobs provide for greater throughput and better recovery from network issues.

📘

Upload Optimizations

This tutorial does not discuss other optimizations that you should consider before implementing this process in production such as multi-threaded streaming, which can increase the throughput of uploads and failure recovery and exception handling.

In the following sections, we illustrate this recipe using the C# and Python programming languages.

Step 1: Identify server instance

Use the Get server instances operation (/databridge/v1/sql-instances) to request information about the managed server instances available on Data Bridge. The operation takes no parameters.

As with all requests that use Data Bridge API operations, you must specify a host in the operation URI and provide a valid API key with the request. The {host} identifies the environment hosting your Data Bridge instance (one of api-euw1.rms.com or api-use1.rms.com) and the apiKey specifies an API key token. For details, see Authentication and Authorization.

In the example, we assume that the tenant has a single managed server instance.

C# example

public static void Main(string[] args) 
{ 
    var baseUrl = "https://api-euw1.rms.com"; 
    var apiKey = "xxxxxxxxxx"; 
    var sqlInstance = GetSqlInstance(baseUrl, apiKey); 
} 
 
private static string GetSqlInstance(string baseUrl, string token) 
{ 
    var request = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances"); 
    request.Method = "GET"; 
    request.Headers.Add("Authorization", token); 
    var response = (HttpWebResponse)request.GetResponse(); 
    using (var streamReader = new StreamReader(response.GetResponseStream())) 
    { 
        var content = streamReader.ReadToEnd(); 
        var jsonResponse = JArray.Parse(content); 
        return jsonResponse.First()["name"].ToString(); 
    } 
} 

Python example

import requests 
 
def get_auth_header(auth_key): 
    headers = { 
        "Authorization": auth_key 
    } 
    return headers 
 
def get_sql_instance(base_url, headers): 
    url = f"{base_url}/databridge/v1/sql-instances" 
    response = requests.get(url, headers=headers) 
    response.raise_for_status() 
    return response.json()[0]["name"] 
 
if __name__ == "__main__": 
    base_url = "https://api-euw1.rms.com" 
    api_key = "xxxxxxxxxx" 
    auth_header = get_auth_header(api_key) 
    sql_instance = get_sql_instance(base_url, auth_header) 

The operation returns information about each managed server instance including the name, uid, and endpoint attributes in an array. Use the value of the name attribute to identify a server instance in Step 2.

Step 2: Initiate multi-part upload job

Once you have the name your server instance, you can use the Initiate multipart upload operation (POST /v1/sql-instances/{instanceName}/databases/{databaseName}/{fileExtension}/init-upload) to initiate a multi-part upload job.

The operation takes three path parameters: instanceName, databaseName, and fileExtension:

  • The instanceName parameter specifies the name of a server instance. Use the value returned in Step 1.
  • The databaseName parameter specifies the name that will be given to the new database on the server instance.
  • The fileextension parameter identifies the file type of the uploaded database artifact. One of mdf, bak or dacpac.

In the example, we create a multi-part upload job to transfer an MDF database artifact file to the cloud and to create a new database named my_edm on our server instance.

C# example

public static void Main(string[] args) 
{ 
    // Previous steps omitted. See Step 1 
    var fileExtension = "mdf"; 
    var edmName = "my_edm"; 
    var uploadId = GenerateUploadId(baseUrl, apiKey, sqlInstance, fileExtension, edmName); 
} 
 
private static string GenerateUploadId(string baseUrl, string token, string sqlInstance, string fileExtension, string databaseName) 
{ 
    var request = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances/{sqlInstance}/databases/{databaseName}/{fileExtension}/init-upload"); 
    request.Method = "POST"; 
    request.Headers.Add("Authorization", token); 
    var response = (HttpWebResponse)request.GetResponse(); 
    using (var streamReader = new StreamReader(response.GetResponseStream())) 
    { 
        var content = streamReader.ReadToEnd(); 
        var jsonResponse = JObject.Parse(content); 
        return jsonResponse["uploadId"].ToString(); 
    } 
} 

Python example

def generate_upload_id(base_url, headers, database_name, file_extension, sql_instance): 
    url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/init-upload" 
    response = requests.post(url, headers=headers) 
    response.raise_for_status() 
    return response.json()["uploadId"] 
 
if __name__ == "__main__": 
    # Previous steps omitted. See Step 1 
    database_name = "my_python_edm" 
    file_extension = "mdf" 
    upload_id = generate_upload_id(base_url, auth_header, database_name, file_extension, sql_instance) 

If successful, the operation constructs a request for uploading the database artifact to Data Bridge and returns an upload job ID.

Step 3: Upload file chunks to storage buckets

Now that you have an upload ID for the multi-part upload job, you can break the database artifact file into discreet chunks of data, acquire a pre-signed URI to an storage bucket on AWS, and transfer chunks of data to that storage bucket.

  1. First, we read the database artifact from a local directory and break the file data into discreet chunks. Each chunk is identified by a unique partNumber. We iterate over the data and write the parts to a buffer byte array. If the database artifact file is 100 MB in size and you break it into 20 MB chunks, you can expect the loop to run five times.
  2. Use the Get pre-signed URI for multipart upload operation (GET /v1/sql-instances/{instanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}) operation to retrieve a pre-signed URI for uploading data to AWS. The pre-signed URI provides temporary access to an storage bucket. Data is transferred as an octet stream to the pre-signed URI.
  3. Once you have the pre-signed URI, use the Upload data part by number operation (PUT /v1/sql-instances/{instanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}) to upload each chunk of the database artifact.

Responses from the operation are caught in the etags dictionary. We collect etags from each part of the multi-part upload job. We will use the etags in Step 4.

C# example

In C#, we break the file into chunks using a FileStream and read specified chunks of the file into the buffer before uploading the chunk.

public static void Main(string[] args) 
{ 
    // Previous steps omitted. See Step 1 and Step 2 
    var localFilePath = "my_edm.mdf"; 
    var bufferSizeInBytes = 100 * 1024 * 1024; 
    UploadFilesUsingMultiPartUpload(baseUrl, apiKey, edmName, localFilePath, fileExtension, bufferSizeInBytes, sqlInstance, uploadId); 
} 
 
private static void UploadFilesUsingMultiPartUpload(string baseUrl, string token, string databaseName, string localFilePath, string fileExtension, int bufferSizeInBytes, string sqlInstanceName, string uploadId) 
{ 
    var fileStream = File.OpenRead(localFilePath); 
    var partNumber = 1; 
    var etags = new Dictionary<string, string>(); 
 
    while (true) 
    { 
        var buffer = new byte[bufferSizeInBytes]; 
        var bytesRead = fileStream.Read(buffer, 0, buffer.Length); // Read a chunk from the file the size of the buffer 
        if (bytesRead == 0) 
        { 
            break; 
        } 
 
        // Get partial upload URI 
        var uploadUrl = GetPresignedPartialURL(baseUrl, token, databaseName, fileExtension, sqlInstanceName, uploadId, partNumber); 
 
        // Upload chunk to URI 
        etags.Add(partNumber.ToString(), UploadByteArray(buffer, uploadUrl)); 
 
        partNumber++; 
    } 
} 
 
private static string GetPresignedPartialURL(string baseUrl, string token, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, int partNumber) 
{ 
    var request = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}"); 
    request.Method = "GET"; 
    request.Headers.Add("Authorization", token); 
    var response = (HttpWebResponse)request.GetResponse(); 
    var uploadUrl = ""; 
    using (var streamReader = new StreamReader(response.GetResponseStream())) 
    { 
        var content = streamReader.ReadToEnd(); 
        uploadUrl = content.ToString(); 
    } 
    return uploadUrl; 
} 
 
private static string UploadByteArray(byte[] buffer, string uploadUrl) 
{ 
    var uploadRequest = (HttpWebRequest)WebRequest.Create(uploadUrl); 
    uploadRequest.Method = "PUT"; 
    uploadRequest.ContentType = "application/octet-stream"; 
    uploadRequest.ContentLength = buffer.Length; 
    using (var dataStream = uploadRequest.GetRequestStream()) 
    { 
        dataStream.Write(buffer, 0, buffer.Length); 
    } 
    var uploadResponse = (HttpWebResponse)uploadRequest.GetResponse(); 
    if (uploadResponse.StatusCode != HttpStatusCode.OK) 
    { 
        throw new Exception("Unable to upload chunk"); 
 
    } 
    return uploadResponse.Headers["ETag"].Replace("\"", ""); 
} 

Python example

def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id): 
    etags = {} 
    with open(local_file_path, 'rb') as db_file: 
        chunk = db_file.read(chunk_size_bytes) 
        part_number = 1 
        while chunk: 
            url = get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number) 
            etag = upload_chunk_to_url(url, chunk) 
            etags[str(part_number)] = etag 
            chunk = db_file.read(chunk_size_bytes) 
            part_number = part_number + 1 
 
 
def upload_chunk_to_url(url, chunk): 
    headers = { 
        "Content-Type": "application/octet-stream" 
    } 
    response = requests.put(url, headers=headers, data=chunk) 
    response.raise_for_status() 
    return response.headers["Etag"].strip('\"') 
 
 
def get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number): 
    url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/upload-part/{upload_id}/{part_number}" 
    response = requests.get(url, headers=headers) 
    response.raise_for_status() 
    return response.text 
 
if __name__ == "__main__": 
    // Previous steps omitted. See Step 1 and Step 2 
    chunk_size = 20 * 1024 * 1024 
    local_path = "my_python_edm.mdf" 
    upload_files_using_multi_part_upload(base_url, auth_header, database_name, local_path, file_extension, chunk_size, sql_instance, upload_id) 

For each chunk uploaded to the Amazon storage bucket, the operation returns an uploadId and etags Dictionary.

Step 4: Complete file upload

Now that the entire file has been uploaded to the Amazon storage bucket, use the Complete multipart upload operation (POST /databridge/v1/sql-instances/{instanceName}/databases/{databaseName}/{fileExtension}/complete-upload) to complete the multi-part upload process.

As before the operation requires the instanceName, databaseName, and fileExtension path parameters. We add the uploadid and etags that we retrieved from the upload chunks and send them as part of the payload.

C# example

private static void UploadFilesUsingMultiPartUpload(string baseUrl, string token, string databaseName, string localFilePath, string fileExtension, int bufferSizeInBytes, string sqlInstanceName, string uploadId) 
{ 
    // Previous steps omitted. See Step 1, Step 2, and Step 3 
    CompleteUpload(baseUrl, token, databaseName, fileExtension, sqlInstanceName, uploadId, etags); 
} 
 
private static void CompleteUpload(string baseUrl, string token, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, Dictionary<string, string> etags) 
{ 
    var completeRequest = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/complete-upload"); 
    completeRequest.Method = "POST"; 
    completeRequest.Headers.Add("Authorization", token); 
    using (var streamWriter = new StreamWriter(completeRequest.GetRequestStream())) 
    { 
        var json = string.Format("{{\"uploadId\": \"{0}\" , \"etags\": {1}}}", uploadId, JsonConvert.SerializeObject(etags)); 
        streamWriter.Write(json); 
    } 
    var completeResponse = (HttpWebResponse)completeRequest.GetResponse(); 
    if (completeResponse.StatusCode != HttpStatusCode.OK) 
    { 
        throw new Exception("didn't work"); 
    } 
} 

Python example

def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id): 
    // Previous steps omitted. See Step 1, Step 2, and Step 3 
    complete_url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/complete-upload" 
    payload = { 
        "uploadId": upload_id, 
        "etags": etags 
    } 
    response = requests.post(complete_url, headers=headers, json=payload) 
    response.raise_for_status() 

Complete code examples

Complete C# example

This tutorial has been created using .NET Core 3.1 and should work for .NET 5. The only external dependency that we’re using in this tutorial is Newtonsoft.json, which can be added as a Nuget dependency.

using System; 
using System.Collections.Generic; 
using System.IO; 
using System.Linq; 
using System.Net; 
using Newtonsoft.Json; 
using Newtonsoft.Json.Linq; 
 
namespace UploadSample 
{ 
    public class MultiPartUploadProgram 
    { 
        public static void Main(string[] args) 
        { 
            var baseUrl = "https://api-euw1.rms.com"; 
            var apiKey = args[0]; 
 
            var fileExtension = "mdf"; 
            var edmName = "my_edm_93"; 
 
            var localFilePath = "my_edm.mdf"; 
            var bufferSizeInBytes = 20 * 1024 * 1024; 
 
            var sqlInstance = GetSqlInstance(baseUrl, apiKey); 
            var uploadId = GenerateUploadId(baseUrl, apiKey, sqlInstance, fileExtension, edmName); 
            UploadFilesUsingMultiPartUpload(baseUrl, apiKey, edmName, localFilePath, fileExtension, bufferSizeInBytes, sqlInstance, uploadId); 
        } 
 
        private static void UploadFilesUsingMultiPartUpload(string baseUrl, string token, string databaseName, string localFilePath, string fileExtension, int bufferSizeInBytes, string sqlInstanceName, string uploadId) 
        { 
            var fileStream = File.OpenRead(localFilePath); 
            var partNumber = 1; 
            var etags = new Dictionary<string, string>(); 
 
            while (true) 
            { 
                var buffer = new byte[bufferSizeInBytes]; 
                var bytesRead = fileStream.Read(buffer, 0, buffer.Length); // Read a chunk from the file the size of the buffer 
                if (bytesRead == 0) 
                { 
                    break; 
                } 
 
                // Get partial upload URI 
                var uploadUrl = GetPresignedPartialURL(baseUrl, token, databaseName, fileExtension, sqlInstanceName, uploadId, partNumber); 
 
                // Upload chunk to URI 
                etags.Add(partNumber.ToString(), UploadByteArray(buffer, uploadUrl)); 
                partNumber++; 
            } 
 
            CompleteUpload(baseUrl, token, databaseName, fileExtension, sqlInstanceName, uploadId, etags); 
        } 
 
        private static string GetPresignedPartialURL(string baseUrl, string token, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, int partNumber) 
        { 
            var request = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}"); 
            request.Method = "GET"; 
            request.Headers.Add("Authorization", token); 
            var response = (HttpWebResponse)request.GetResponse(); 
            var uploadUrl = ""; 
            using (var streamReader = new StreamReader(response.GetResponseStream())) 
            { 
                var content = streamReader.ReadToEnd(); 
                uploadUrl = content.ToString(); 
            } 
            return uploadUrl; 
        } 
 
        private static string UploadByteArray(byte[] buffer, string uploadUrl) 
        { 
            var uploadRequest = (HttpWebRequest)WebRequest.Create(uploadUrl); 
            uploadRequest.Method = "PUT"; 
            uploadRequest.ContentType = "application/octet-stream"; 
            uploadRequest.ContentLength = buffer.Length; 
            using (var dataStream = uploadRequest.GetRequestStream()) 
            { 
                dataStream.Write(buffer, 0, buffer.Length); 
            } 
            var uploadResponse = (HttpWebResponse)uploadRequest.GetResponse(); 
            if (uploadResponse.StatusCode != HttpStatusCode.OK) 
            { 
                throw new Exception("Unable to upload chunk"); 
 
            } 
            return uploadResponse.Headers["ETag"].Replace("\"", ""); 
        } 
 
        private static void CompleteUpload(string baseUrl, string token, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, Dictionary<string, string> etags) 
        { 
            var completeRequest = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/complete-upload"); 
            completeRequest.Method = "POST"; 
            completeRequest.Headers.Add("Authorization", token); 
            using (var streamWriter = new StreamWriter(completeRequest.GetRequestStream())) 
            { 
                var json = string.Format("{{\"uploadId\": \"{0}\" , \"etags\": {1}}}", uploadId, JsonConvert.SerializeObject(etags)); 
                streamWriter.Write(json); 
            } 
            var completeResponse = (HttpWebResponse)completeRequest.GetResponse(); 
            if (completeResponse.StatusCode != HttpStatusCode.OK) 
            { 
                throw new Exception("didn't work"); 
            } 
        } 
 
        private static string GenerateUploadId(string baseUrl, string token, string sqlInstance, string fileExtension, string databaseName) 
        { 
            var request = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances/{sqlInstance}/databases/{databaseName}/{fileExtension}/init-upload"); 
            request.Method = "POST"; 
            request.Headers.Add("Authorization", token); 
            var response = (HttpWebResponse)request.GetResponse(); 
            using (var streamReader = new StreamReader(response.GetResponseStream())) 
            { 
                var content = streamReader.ReadToEnd(); 
                var jsonResponse = JObject.Parse(content); 
                return jsonResponse["uploadId"].ToString(); 
            } 
        } 
 
 
        private static string GetSqlInstance(string baseUrl, string token) 
        { 
            var request = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances"); 
            request.Method = "GET"; 
            request.Headers.Add("Authorization", token); 
            var response = (HttpWebResponse)request.GetResponse(); 
            using (var streamReader = new StreamReader(response.GetResponseStream())) 
            { 
                var content = streamReader.ReadToEnd(); 
                var jsonResponse = JArray.Parse(content); 
                return jsonResponse.First()["name"].ToString(); 
            } 
        } 
    } 
} 

Complete Python example

All code samples are written in Python 3.9. The tutorial requires one external dependency, the requests library which can be pip installed.

import requests 
 
def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id): 
    etags = {} 
    with open(local_file_path, 'rb') as db_file: 
        chunk = db_file.read(chunk_size_bytes) 
        part_number = 1 
        while chunk: 
            url = get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number) 
            etag = upload_chunk_to_url(url, chunk) 
            etags[str(part_number)] = etag 
            chunk = db_file.read(chunk_size_bytes) 
            part_number = part_number + 1 
    complete_url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/complete-upload" 
    payload = { 
        "uploadId": upload_id, 
        "etags": etags 
    } 
    response = requests.post(complete_url, headers=headers, json=payload) 
    response.raise_for_status() 
 
 
def upload_chunk_to_url(url, chunk): 
    headers = { 
        "Content-Type": "application/octet-stream" 
    } 
    response = requests.put(url, headers=headers, data=chunk) 
    response.raise_for_status() 
    return response.headers["Etag"].strip('\"') 
 
 
def get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number): 
    url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/upload-part/{upload_id}/{part_number}" 
    response = requests.get(url, headers=headers) 
    response.raise_for_status() 
    return response.text 
 
 
def get_sql_instance(base_url, headers): 
    url = f"{base_url}/databridge/v1/sql-instances" 
    response = requests.get(url, headers=headers) 
    response.raise_for_status() 
    return response.json()[0]["name"] 
 
 
def generate_upload_id(base_url, headers, database_name, file_extension, sql_instance): 
    url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/init-upload" 
    response = requests.post(url, headers=headers) 
    response.raise_for_status() 
    return response.json()["uploadId"] 
 
 
def get_auth_header(auth_key): 
    headers = { 
        "Authorization": auth_key 
    } 
    return headers 
 
if __name__ == "__main__": 
    base_url = "https://api-euw1.rms.com" 
    api_key = "***" 
    database_name = "my_python_edm" 
    local_path = "my_python_edm.mdf" 
    file_extension = "mdf" 
    chunk_size = 20 * 1024 * 1024 
    auth_header = get_auth_header(api_key) 
    sql_instance = get_sql_instance(base_url, auth_header) 
    upload_id = generate_upload_id(base_url, auth_header, database_name, file_extension, sql_instance) 
    upload_files_using_multi_part_upload(base_url, auth_header, database_name, local_path, file_extension, chunk_size, sql_