Multipart Import Databases

Import databases into managed SQL Server instances using the Data Bridge Import/Upload API

Data Bridge facilitates the migration of exposure and results data from RMS Risk When using the cloud to store our data, we need to ensure that we can upload data to the cloud quickly and reliably.

The Import/Upload API defines services that enable you to upload and import large files (greater than 5GB in size) in multiple parts. Use this service for API integrations. RMS recommends that you use multi-part upload for all data integration projects. Although this workflow is more complicated than a single-stream upload workflow, multi-part upload jobs provide for greater throughput and better recovery from network issues.

📘

Upload Optimizations

This tutorial does not discuss other optimizations that you should consider before implementing this process in production such as multi-threaded streaming, which can increase the throughput of uploads and failure recovery and exception handling.

Step 1: Identify SQL Server instance

Use the Get SQL Server instances service (GET /databridge/v1/sql-instances) to request information about the SQL Server instances available on your tenant’s Data Bridge. The service takes no parameters.

As with all requests that use Data Bridge API services, you must specify a host in the service URL and provide a valid API key with the request. The {host} identifies the environment hosting your Data Bridge instance (one of api-euw1.rms.com or api-use1.rms.com) and the apiKey specifies an API key token. For details, see Risk Intelligence Authentication.

In the example, we assume that the tenant has a single SQL Server instance.

C# example

public static void Main(string[] args) 
{ 
    var baseUrl = "https://api-euw1.rms.com"; 
    var apiKey = "xxxxxxxxxx"; 
    var sqlInstance = GetSqlInstance(baseUrl, apiKey); 
} 
 
private static string GetSqlInstance(string baseUrl, string token) 
{ 
    var request = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances"); 
    request.Method = "GET"; 
    request.Headers.Add("Authorization", token); 
    var response = (HttpWebResponse)request.GetResponse(); 
    using (var streamReader = new StreamReader(response.GetResponseStream())) 
    { 
        var content = streamReader.ReadToEnd(); 
        var jsonResponse = JArray.Parse(content); 
        return jsonResponse.First()["name"].ToString(); 
    } 
} 

Python example

import requests 
 
def get_auth_header(auth_key): 
    headers = { 
        "Authorization": auth_key 
    } 
    return headers 
 
def get_sql_instance(base_url, headers): 
    url = f"{base_url}/databridge/v1/sql-instances" 
    response = requests.get(url, headers=headers) 
    response.raise_for_status() 
    return response.json()[0]["name"] 
 
if __name__ == "__main__": 
    base_url = "https://api-euw1.rms.com" 
    api_key = "xxxxxxxxxx" 
    auth_header = get_auth_header(api_key) 
    sql_instance = get_sql_instance(base_url, auth_header) 

The service returns information about each SQL Server instance including the name, uid, and endpoint attributes in an array. Use the value of the name attribute to identify a SQL Server instance in Step 2.

Step 2: Initiate multi-part upload job

Once you have the name your SQL Server instance, you can use the Upload database artifacts service (POST /v1/sql-instances/{instanceName}/databases/{databaseName}/{fileExtension}/init-upload) to initiate a multi-part upload job.

The service takes three path parameters: instanceName, databaseName, and fileExtension. The instanceName parameter specifies the name of a SQL Server instance. Use the value returned by the Get SQL Server instances service in Step 1. The databaseName parameter specifies the name that will be given to the new database on the SQL Server instance. The fileextension parameter identifies the file type of the uploaded database artifact. One of mdf, bak or dacpac.

In the example, we create a multi-part upload job to transfer an MDF database artifact file to the cloud and to create a new database named my_edm on our SQL Server instance.

C# example

public static void Main(string[] args) 
{ 
    // Previous steps omitted. See Step 1 
    var fileExtension = "mdf"; 
    var edmName = "my_edm"; 
    var uploadId = GenerateUploadId(baseUrl, apiKey, sqlInstance, fileExtension, edmName); 
} 
 
private static string GenerateUploadId(string baseUrl, string token, string sqlInstance, string fileExtension, string databaseName) 
{ 
    var request = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances/{sqlInstance}/databases/{databaseName}/{fileExtension}/init-upload"); 
    request.Method = "POST"; 
    request.Headers.Add("Authorization", token); 
    var response = (HttpWebResponse)request.GetResponse(); 
    using (var streamReader = new StreamReader(response.GetResponseStream())) 
    { 
        var content = streamReader.ReadToEnd(); 
        var jsonResponse = JObject.Parse(content); 
        return jsonResponse["uploadId"].ToString(); 
    } 
} 

Python example

def generate_upload_id(base_url, headers, database_name, file_extension, sql_instance): 
    url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/init-upload" 
    response = requests.post(url, headers=headers) 
    response.raise_for_status() 
    return response.json()["uploadId"] 
 
if __name__ == "__main__": 
    # Previous steps omitted. See Step 1 
    database_name = "my_python_edm" 
    file_extension = "mdf" 
    upload_id = generate_upload_id(base_url, auth_header, database_name, file_extension, sql_instance) 

If successful, the service constructs a request for uploading the database artifact to Data Bridge and returns an upload job ID.

Step 3: Upload file chunks to S3 buckets

Now that you have an upload ID for the multi-part upload job, you can break the database artifact file into discreet chunks of data and transfer those chunks individually to the cloud using the Update data files service (POST /databridge/v1/sql-instances/{instanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}).

First, we read the database artifact from a local directory and break the file data into discreet chunks. Each chunk is identified by a unique partNumber. We iterate over the data and write the parts to a buffer byte array. If the database artifact file is 100 MB in size and you break it into 20 MB chunks, you can expect the loop to run five times.

Use the Get pre-signed URL (GET /v1/sql-instances/{instanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}) service to retrieve a pre-signed URL for uploading data to AWS. The pre-signed URL provides temporary access to an S3 bucket. Data is transferred as an octet stream to the pre-signed URL.

Responses from the service are caught in the etags dictionary. We collect etags from each part of the multi-part upload job. We will use the etags in the next phase of the workflow. S3 buckets

The GET /v1/sql-instances/{instanceName}/databases/{databaseName}/import service enables you to fetch the path to an S3 bucket and temporary security credentials that will enable you to upload the database artifact to that S3 bucket.

The service takes two required path parameters: instanceName and databaseName. The instanceName specifies the name of the managed SQL Server instance (for example, databridge-1). The databaseName specifies the name of the database artifact (for example, newDbconsumerbak).

In C#, we break the file into chunks using a FileStream and read specified chunks of the file into the buffer before uploading the chunk.

C# example

public static void Main(string[] args) 
{ 
    // Previous steps omitted. See Step 1 and Step 2 
    var localFilePath = "my_edm.mdf"; 
    var bufferSizeInBytes = 100 * 1024 * 1024; 
    UploadFilesUsingMultiPartUpload(baseUrl, apiKey, edmName, localFilePath, fileExtension, bufferSizeInBytes, sqlInstance, uploadId); 
} 
 
private static void UploadFilesUsingMultiPartUpload(string baseUrl, string token, string databaseName, string localFilePath, string fileExtension, int bufferSizeInBytes, string sqlInstanceName, string uploadId) 
{ 
    var fileStream = File.OpenRead(localFilePath); 
    var partNumber = 1; 
    var etags = new Dictionary<string, string>(); 
 
    while (true) 
    { 
        var buffer = new byte[bufferSizeInBytes]; 
        var bytesRead = fileStream.Read(buffer, 0, buffer.Length); // Read a chunk from the file the size of the buffer 
        if (bytesRead == 0) 
        { 
            break; 
        } 
 
        // Get partial upload URL 
        var uploadUrl = GetPresignedPartialURL(baseUrl, token, databaseName, fileExtension, sqlInstanceName, uploadId, partNumber); 
 
        // Upload chunk to URL 
        etags.Add(partNumber.ToString(), UploadByteArray(buffer, uploadUrl)); 
 
        partNumber++; 
    } 
} 
 
private static string GetPresignedPartialURL(string baseUrl, string token, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, int partNumber) 
{ 
    var request = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}"); 
    request.Method = "GET"; 
    request.Headers.Add("Authorization", token); 
    var response = (HttpWebResponse)request.GetResponse(); 
    var uploadUrl = ""; 
    using (var streamReader = new StreamReader(response.GetResponseStream())) 
    { 
        var content = streamReader.ReadToEnd(); 
        uploadUrl = content.ToString(); 
    } 
    return uploadUrl; 
} 
 
private static string UploadByteArray(byte[] buffer, string uploadUrl) 
{ 
    var uploadRequest = (HttpWebRequest)WebRequest.Create(uploadUrl); 
    uploadRequest.Method = "PUT"; 
    uploadRequest.ContentType = "application/octet-stream"; 
    uploadRequest.ContentLength = buffer.Length; 
    using (var dataStream = uploadRequest.GetRequestStream()) 
    { 
        dataStream.Write(buffer, 0, buffer.Length); 
    } 
    var uploadResponse = (HttpWebResponse)uploadRequest.GetResponse(); 
    if (uploadResponse.StatusCode != HttpStatusCode.OK) 
    { 
        throw new Exception("Unable to upload chunk"); 
 
    } 
    return uploadResponse.Headers["ETag"].Replace("\"", ""); 
} 

Python example

def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id): 
    etags = {} 
    with open(local_file_path, 'rb') as db_file: 
        chunk = db_file.read(chunk_size_bytes) 
        part_number = 1 
        while chunk: 
            url = get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number) 
            etag = upload_chunk_to_url(url, chunk) 
            etags[str(part_number)] = etag 
            chunk = db_file.read(chunk_size_bytes) 
            part_number = part_number + 1 
 
 
def upload_chunk_to_url(url, chunk): 
    headers = { 
        "Content-Type": "application/octet-stream" 
    } 
    response = requests.put(url, headers=headers, data=chunk) 
    response.raise_for_status() 
    return response.headers["Etag"].strip('\"') 
 
 
def get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number): 
    url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/upload-part/{upload_id}/{part_number}" 
    response = requests.get(url, headers=headers) 
    response.raise_for_status() 
    return response.text 
 
if __name__ == "__main__": 
    // Previous steps omitted. See Step 1 and Step 2 
    chunk_size = 20 * 1024 * 1024 
    local_path = "my_python_edm.mdf" 
    upload_files_using_multi_part_upload(base_url, auth_header, database_name, local_path, file_extension, chunk_size, sql_instance, upload_id) 

For each chunk uploaded to the Amazon S3 bucket, the service returns an uploadid and etags Dictionary.

Step 4: Complete file upload

Now that the entire file has been uploaded to the Amazon S3 bucket, use the Complete file upload service (POST /databridge/v1/sql-instances/{instanceName}/databases/{databaseName}/{fileExtension}/complete-upload) to complete the multi-part upload process.

As before the service requires the instanceName, databaseName, and fileExtension path parameters. We add the uploadid and etags that we retrieved from the upload chunks and send them as part of the payload to Data Bridge.

C# example

private static void UploadFilesUsingMultiPartUpload(string baseUrl, string token, string databaseName, string localFilePath, string fileExtension, int bufferSizeInBytes, string sqlInstanceName, string uploadId) 
{ 
    // Previous steps omitted. See Step 1, Step 2, and Step 3 
    CompleteUpload(baseUrl, token, databaseName, fileExtension, sqlInstanceName, uploadId, etags); 
} 
 
private static void CompleteUpload(string baseUrl, string token, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, Dictionary<string, string> etags) 
{ 
    var completeRequest = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/complete-upload"); 
    completeRequest.Method = "POST"; 
    completeRequest.Headers.Add("Authorization", token); 
    using (var streamWriter = new StreamWriter(completeRequest.GetRequestStream())) 
    { 
        var json = string.Format("{{\"uploadId\": \"{0}\" , \"etags\": {1}}}", uploadId, JsonConvert.SerializeObject(etags)); 
        streamWriter.Write(json); 
    } 
    var completeResponse = (HttpWebResponse)completeRequest.GetResponse(); 
    if (completeResponse.StatusCode != HttpStatusCode.OK) 
    { 
        throw new Exception("didn't work"); 
    } 
} 

Python example

def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id): 
    // Previous steps omitted. See Step 1, Step 2, and Step 3 
    complete_url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/complete-upload" 
    payload = { 
        "uploadId": upload_id, 
        "etags": etags 
    } 
    response = requests.post(complete_url, headers=headers, json=payload) 
    response.raise_for_status() 

Complete code examples

Complete C# example

This tutorial has been created using .NET Core 3.1 and should work for .NET 5. The only external dependency that we’re using in this tutorial is Newtonsoft.json, which can be added as a Nuget dependency.

using System; 
using System.Collections.Generic; 
using System.IO; 
using System.Linq; 
using System.Net; 
using Newtonsoft.Json; 
using Newtonsoft.Json.Linq; 
 
namespace UploadSample 
{ 
    public class MultiPartUploadProgram 
    { 
        public static void Main(string[] args) 
        { 
            var baseUrl = "https://api-euw1.rms.com"; 
            var apiKey = args[0]; 
 
            var fileExtension = "mdf"; 
            var edmName = "my_edm_93"; 
 
            var localFilePath = "my_edm.mdf"; 
            var bufferSizeInBytes = 20 * 1024 * 1024; 
 
            var sqlInstance = GetSqlInstance(baseUrl, apiKey); 
            var uploadId = GenerateUploadId(baseUrl, apiKey, sqlInstance, fileExtension, edmName); 
            UploadFilesUsingMultiPartUpload(baseUrl, apiKey, edmName, localFilePath, fileExtension, bufferSizeInBytes, sqlInstance, uploadId); 
        } 
 
        private static void UploadFilesUsingMultiPartUpload(string baseUrl, string token, string databaseName, string localFilePath, string fileExtension, int bufferSizeInBytes, string sqlInstanceName, string uploadId) 
        { 
            var fileStream = File.OpenRead(localFilePath); 
            var partNumber = 1; 
            var etags = new Dictionary<string, string>(); 
 
            while (true) 
            { 
                var buffer = new byte[bufferSizeInBytes]; 
                var bytesRead = fileStream.Read(buffer, 0, buffer.Length); // Read a chunk from the file the size of the buffer 
                if (bytesRead == 0) 
                { 
                    break; 
                } 
 
                // Get partial upload URL 
                var uploadUrl = GetPresignedPartialURL(baseUrl, token, databaseName, fileExtension, sqlInstanceName, uploadId, partNumber); 
 
                // Upload chunk to URL 
                etags.Add(partNumber.ToString(), UploadByteArray(buffer, uploadUrl)); 
                partNumber++; 
            } 
 
            CompleteUpload(baseUrl, token, databaseName, fileExtension, sqlInstanceName, uploadId, etags); 
        } 
 
        private static string GetPresignedPartialURL(string baseUrl, string token, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, int partNumber) 
        { 
            var request = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}"); 
            request.Method = "GET"; 
            request.Headers.Add("Authorization", token); 
            var response = (HttpWebResponse)request.GetResponse(); 
            var uploadUrl = ""; 
            using (var streamReader = new StreamReader(response.GetResponseStream())) 
            { 
                var content = streamReader.ReadToEnd(); 
                uploadUrl = content.ToString(); 
            } 
            return uploadUrl; 
        } 
 
        private static string UploadByteArray(byte[] buffer, string uploadUrl) 
        { 
            var uploadRequest = (HttpWebRequest)WebRequest.Create(uploadUrl); 
            uploadRequest.Method = "PUT"; 
            uploadRequest.ContentType = "application/octet-stream"; 
            uploadRequest.ContentLength = buffer.Length; 
            using (var dataStream = uploadRequest.GetRequestStream()) 
            { 
                dataStream.Write(buffer, 0, buffer.Length); 
            } 
            var uploadResponse = (HttpWebResponse)uploadRequest.GetResponse(); 
            if (uploadResponse.StatusCode != HttpStatusCode.OK) 
            { 
                throw new Exception("Unable to upload chunk"); 
 
            } 
            return uploadResponse.Headers["ETag"].Replace("\"", ""); 
        } 
 
        private static void CompleteUpload(string baseUrl, string token, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, Dictionary<string, string> etags) 
        { 
            var completeRequest = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/complete-upload"); 
            completeRequest.Method = "POST"; 
            completeRequest.Headers.Add("Authorization", token); 
            using (var streamWriter = new StreamWriter(completeRequest.GetRequestStream())) 
            { 
                var json = string.Format("{{\"uploadId\": \"{0}\" , \"etags\": {1}}}", uploadId, JsonConvert.SerializeObject(etags)); 
                streamWriter.Write(json); 
            } 
            var completeResponse = (HttpWebResponse)completeRequest.GetResponse(); 
            if (completeResponse.StatusCode != HttpStatusCode.OK) 
            { 
                throw new Exception("didn't work"); 
            } 
        } 
 
        private static string GenerateUploadId(string baseUrl, string token, string sqlInstance, string fileExtension, string databaseName) 
        { 
            var request = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances/{sqlInstance}/databases/{databaseName}/{fileExtension}/init-upload"); 
            request.Method = "POST"; 
            request.Headers.Add("Authorization", token); 
            var response = (HttpWebResponse)request.GetResponse(); 
            using (var streamReader = new StreamReader(response.GetResponseStream())) 
            { 
                var content = streamReader.ReadToEnd(); 
                var jsonResponse = JObject.Parse(content); 
                return jsonResponse["uploadId"].ToString(); 
            } 
        } 
 
 
        private static string GetSqlInstance(string baseUrl, string token) 
        { 
            var request = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances"); 
            request.Method = "GET"; 
            request.Headers.Add("Authorization", token); 
            var response = (HttpWebResponse)request.GetResponse(); 
            using (var streamReader = new StreamReader(response.GetResponseStream())) 
            { 
                var content = streamReader.ReadToEnd(); 
                var jsonResponse = JArray.Parse(content); 
                return jsonResponse.First()["name"].ToString(); 
            } 
        } 
    } 
} 

### Complete Python example

All code samples are written in Python 3.9. The tutorial requires one external dependency, the requests library which can be pip installed. 

import requests

def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id):
etags = {}
with open(local_file_path, 'rb') as db_file:
chunk = db_file.read(chunk_size_bytes)
part_number = 1
while chunk:
url = get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number)
etag = upload_chunk_to_url(url, chunk)
etags[str(part_number)] = etag
chunk = db_file.read(chunk_size_bytes)
part_number = part_number + 1
complete_url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/complete-upload"
payload = {
"uploadId": upload_id,
"etags": etags
}
response = requests.post(complete_url, headers=headers, json=payload)
response.raise_for_status()

def upload_chunk_to_url(url, chunk):
headers = {
"Content-Type": "application/octet-stream"
}
response = requests.put(url, headers=headers, data=chunk)
response.raise_for_status()
return response.headers["Etag"].strip('"')

def get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number):
url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/upload-part/{upload_id}/{part_number}"
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.text

def get_sql_instance(base_url, headers):
url = f"{base_url}/databridge/v1/sql-instances"
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()[0]["name"]

def generate_upload_id(base_url, headers, database_name, file_extension, sql_instance):
url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/init-upload"
response = requests.post(url, headers=headers)
response.raise_for_status()
return response.json()["uploadId"]

def get_auth_header(auth_key):
headers = {
"Authorization": auth_key
}
return headers

if name == "main":
baseurl = "https://api-euw1.rms.com"
api_key = "**"
database_name = "my_python_edm"
local_path = "my_python_edm.mdf"
file_extension = "mdf"
chunk_size = 20
1024 * 1024
auth_header = get_auth_header(api_key)
sql_instance = get_sql_instance(base_url, auth_header)
upload_id = generate_upload_id(base_url, auth_header, database_name, file_extension, sql_instance)
upload_files_using_multi_part_upload(base_url, auth_header, database_name, local_path, file_extension, chunk_size, sql


 

Did this page help you?