Multipart Import Databases
Import databases into managed SQL Server instances using the Data Bridge Import/Upload API
Data Bridge facilitates the migration of exposure and results data from RMS Risk When using the cloud to store our data, we need to ensure that we can upload data to the cloud quickly and reliably.
The Import/Upload API defines services that enable you to upload and import large files (greater than 5GB in size) in multiple parts. Use this service for API integrations. RMS recommends that you use multi-part upload for all data integration projects. Although this workflow is more complicated than a single-stream upload workflow, multi-part upload jobs provide for greater throughput and better recovery from network issues.
Upload Optimizations
This tutorial does not discuss other optimizations that you should consider before implementing this process in production such as multi-threaded streaming, which can increase the throughput of uploads and failure recovery and exception handling.
Step 1: Identify SQL Server instance
Use the Get SQL Server instances service (GET /databridge/v1/sql-instances
) to request information about the SQL Server instances available on your tenant’s Data Bridge. The service takes no parameters.
As with all requests that use Data Bridge API services, you must specify a host in the service URL and provide a valid API key with the request. The {host}
identifies the environment hosting your Data Bridge instance (one of api-euw1.rms.com
or api-use1.rms.com
) and the apiKey
specifies an API key token. For details, see Risk Intelligence Authentication.
In the example, we assume that the tenant has a single SQL Server instance.
C# example
public static void Main(string[] args)
{
var baseUrl = "https://api-euw1.rms.com";
var apiKey = "xxxxxxxxxx";
var sqlInstance = GetSqlInstance(baseUrl, apiKey);
}
private static string GetSqlInstance(string baseUrl, string token)
{
var request = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances");
request.Method = "GET";
request.Headers.Add("Authorization", token);
var response = (HttpWebResponse)request.GetResponse();
using (var streamReader = new StreamReader(response.GetResponseStream()))
{
var content = streamReader.ReadToEnd();
var jsonResponse = JArray.Parse(content);
return jsonResponse.First()["name"].ToString();
}
}
Python example
import requests
def get_auth_header(auth_key):
headers = {
"Authorization": auth_key
}
return headers
def get_sql_instance(base_url, headers):
url = f"{base_url}/databridge/v1/sql-instances"
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()[0]["name"]
if __name__ == "__main__":
base_url = "https://api-euw1.rms.com"
api_key = "xxxxxxxxxx"
auth_header = get_auth_header(api_key)
sql_instance = get_sql_instance(base_url, auth_header)
The service returns information about each SQL Server instance including the name, uid, and endpoint attributes in an array. Use the value of the name attribute to identify a SQL Server instance in Step 2.
Step 2: Initiate multi-part upload job
Once you have the name your SQL Server instance, you can use the Upload database artifacts service (POST /v1/sql-instances/{instanceName}/databases/{databaseName}/{fileExtension}/init-upload)
to initiate a multi-part upload job.
The service takes three path parameters: instanceName
, databaseName
, and fileExtension
. The instanceName
parameter specifies the name of a SQL Server instance. Use the value returned by the Get SQL Server instances service in Step 1. The databaseName parameter specifies the name that will be given to the new database on the SQL Server instance. The fileextension
parameter identifies the file type of the uploaded database artifact. One of mdf
, bak
or dacpac
.
In the example, we create a multi-part upload job to transfer an MDF database artifact file to the cloud and to create a new database named my_edm
on our SQL Server instance.
C# example
public static void Main(string[] args)
{
// Previous steps omitted. See Step 1
var fileExtension = "mdf";
var edmName = "my_edm";
var uploadId = GenerateUploadId(baseUrl, apiKey, sqlInstance, fileExtension, edmName);
}
private static string GenerateUploadId(string baseUrl, string token, string sqlInstance, string fileExtension, string databaseName)
{
var request = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances/{sqlInstance}/databases/{databaseName}/{fileExtension}/init-upload");
request.Method = "POST";
request.Headers.Add("Authorization", token);
var response = (HttpWebResponse)request.GetResponse();
using (var streamReader = new StreamReader(response.GetResponseStream()))
{
var content = streamReader.ReadToEnd();
var jsonResponse = JObject.Parse(content);
return jsonResponse["uploadId"].ToString();
}
}
Python example
def generate_upload_id(base_url, headers, database_name, file_extension, sql_instance):
url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/init-upload"
response = requests.post(url, headers=headers)
response.raise_for_status()
return response.json()["uploadId"]
if __name__ == "__main__":
# Previous steps omitted. See Step 1
database_name = "my_python_edm"
file_extension = "mdf"
upload_id = generate_upload_id(base_url, auth_header, database_name, file_extension, sql_instance)
If successful, the service constructs a request for uploading the database artifact to Data Bridge and returns an upload job ID.
Step 3: Upload file chunks to S3 buckets
Now that you have an upload ID for the multi-part upload job, you can break the database artifact file into discreet chunks of data and transfer those chunks individually to the cloud using the Update data files service (POST /databridge/v1/sql-instances/{instanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}
).
First, we read the database artifact from a local directory and break the file data into discreet chunks. Each chunk is identified by a unique partNumber. We iterate over the data and write the parts to a buffer byte array. If the database artifact file is 100 MB in size and you break it into 20 MB chunks, you can expect the loop to run five times.
Use the Get pre-signed URL (GET /v1/sql-instances/{instanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}
) service to retrieve a pre-signed URL for uploading data to AWS. The pre-signed URL provides temporary access to an S3 bucket. Data is transferred as an octet stream to the pre-signed URL.
Responses from the service are caught in the etags dictionary. We collect etags from each part of the multi-part upload job. We will use the etags in the next phase of the workflow. S3 buckets
The GET /v1/sql-instances/{instanceName}/databases/{databaseName}/import
service enables you to fetch the path to an S3 bucket and temporary security credentials that will enable you to upload the database artifact to that S3 bucket.
The service takes two required path parameters: instanceName and databaseName. The instanceName
specifies the name of the managed SQL Server instance (for example, databridge-1). The databaseName
specifies the name of the database artifact (for example, newDbconsumerbak).
In C#, we break the file into chunks using a FileStream
and read specified chunks of the file into the buffer before uploading the chunk.
C# example
public static void Main(string[] args)
{
// Previous steps omitted. See Step 1 and Step 2
var localFilePath = "my_edm.mdf";
var bufferSizeInBytes = 100 * 1024 * 1024;
UploadFilesUsingMultiPartUpload(baseUrl, apiKey, edmName, localFilePath, fileExtension, bufferSizeInBytes, sqlInstance, uploadId);
}
private static void UploadFilesUsingMultiPartUpload(string baseUrl, string token, string databaseName, string localFilePath, string fileExtension, int bufferSizeInBytes, string sqlInstanceName, string uploadId)
{
var fileStream = File.OpenRead(localFilePath);
var partNumber = 1;
var etags = new Dictionary<string, string>();
while (true)
{
var buffer = new byte[bufferSizeInBytes];
var bytesRead = fileStream.Read(buffer, 0, buffer.Length); // Read a chunk from the file the size of the buffer
if (bytesRead == 0)
{
break;
}
// Get partial upload URL
var uploadUrl = GetPresignedPartialURL(baseUrl, token, databaseName, fileExtension, sqlInstanceName, uploadId, partNumber);
// Upload chunk to URL
etags.Add(partNumber.ToString(), UploadByteArray(buffer, uploadUrl));
partNumber++;
}
}
private static string GetPresignedPartialURL(string baseUrl, string token, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, int partNumber)
{
var request = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}");
request.Method = "GET";
request.Headers.Add("Authorization", token);
var response = (HttpWebResponse)request.GetResponse();
var uploadUrl = "";
using (var streamReader = new StreamReader(response.GetResponseStream()))
{
var content = streamReader.ReadToEnd();
uploadUrl = content.ToString();
}
return uploadUrl;
}
private static string UploadByteArray(byte[] buffer, string uploadUrl)
{
var uploadRequest = (HttpWebRequest)WebRequest.Create(uploadUrl);
uploadRequest.Method = "PUT";
uploadRequest.ContentType = "application/octet-stream";
uploadRequest.ContentLength = buffer.Length;
using (var dataStream = uploadRequest.GetRequestStream())
{
dataStream.Write(buffer, 0, buffer.Length);
}
var uploadResponse = (HttpWebResponse)uploadRequest.GetResponse();
if (uploadResponse.StatusCode != HttpStatusCode.OK)
{
throw new Exception("Unable to upload chunk");
}
return uploadResponse.Headers["ETag"].Replace("\"", "");
}
Python example
def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id):
etags = {}
with open(local_file_path, 'rb') as db_file:
chunk = db_file.read(chunk_size_bytes)
part_number = 1
while chunk:
url = get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number)
etag = upload_chunk_to_url(url, chunk)
etags[str(part_number)] = etag
chunk = db_file.read(chunk_size_bytes)
part_number = part_number + 1
def upload_chunk_to_url(url, chunk):
headers = {
"Content-Type": "application/octet-stream"
}
response = requests.put(url, headers=headers, data=chunk)
response.raise_for_status()
return response.headers["Etag"].strip('\"')
def get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number):
url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/upload-part/{upload_id}/{part_number}"
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.text
if __name__ == "__main__":
// Previous steps omitted. See Step 1 and Step 2
chunk_size = 20 * 1024 * 1024
local_path = "my_python_edm.mdf"
upload_files_using_multi_part_upload(base_url, auth_header, database_name, local_path, file_extension, chunk_size, sql_instance, upload_id)
For each chunk uploaded to the Amazon S3 bucket, the service returns an uploadid and etags Dictionary.
Step 4: Complete file upload
Now that the entire file has been uploaded to the Amazon S3 bucket, use the Complete file upload service (POST /databridge/v1/sql-instances/{instanceName}/databases/{databaseName}/{fileExtension}/complete-upload
) to complete the multi-part upload process.
As before the service requires the instanceName
, databaseName
, and fileExtension
path parameters. We add the uploadid
and etags
that we retrieved from the upload chunks and send them as part of the payload to Data Bridge.
C# example
private static void UploadFilesUsingMultiPartUpload(string baseUrl, string token, string databaseName, string localFilePath, string fileExtension, int bufferSizeInBytes, string sqlInstanceName, string uploadId)
{
// Previous steps omitted. See Step 1, Step 2, and Step 3
CompleteUpload(baseUrl, token, databaseName, fileExtension, sqlInstanceName, uploadId, etags);
}
private static void CompleteUpload(string baseUrl, string token, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, Dictionary<string, string> etags)
{
var completeRequest = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/complete-upload");
completeRequest.Method = "POST";
completeRequest.Headers.Add("Authorization", token);
using (var streamWriter = new StreamWriter(completeRequest.GetRequestStream()))
{
var json = string.Format("{{\"uploadId\": \"{0}\" , \"etags\": {1}}}", uploadId, JsonConvert.SerializeObject(etags));
streamWriter.Write(json);
}
var completeResponse = (HttpWebResponse)completeRequest.GetResponse();
if (completeResponse.StatusCode != HttpStatusCode.OK)
{
throw new Exception("didn't work");
}
}
Python example
def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id):
// Previous steps omitted. See Step 1, Step 2, and Step 3
complete_url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/complete-upload"
payload = {
"uploadId": upload_id,
"etags": etags
}
response = requests.post(complete_url, headers=headers, json=payload)
response.raise_for_status()
Complete code examples
Complete C# example
This tutorial has been created using .NET Core 3.1 and should work for .NET 5. The only external dependency that we’re using in this tutorial is Newtonsoft.json, which can be added as a Nuget dependency.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace UploadSample
{
public class MultiPartUploadProgram
{
public static void Main(string[] args)
{
var baseUrl = "https://api-euw1.rms.com";
var apiKey = args[0];
var fileExtension = "mdf";
var edmName = "my_edm_93";
var localFilePath = "my_edm.mdf";
var bufferSizeInBytes = 20 * 1024 * 1024;
var sqlInstance = GetSqlInstance(baseUrl, apiKey);
var uploadId = GenerateUploadId(baseUrl, apiKey, sqlInstance, fileExtension, edmName);
UploadFilesUsingMultiPartUpload(baseUrl, apiKey, edmName, localFilePath, fileExtension, bufferSizeInBytes, sqlInstance, uploadId);
}
private static void UploadFilesUsingMultiPartUpload(string baseUrl, string token, string databaseName, string localFilePath, string fileExtension, int bufferSizeInBytes, string sqlInstanceName, string uploadId)
{
var fileStream = File.OpenRead(localFilePath);
var partNumber = 1;
var etags = new Dictionary<string, string>();
while (true)
{
var buffer = new byte[bufferSizeInBytes];
var bytesRead = fileStream.Read(buffer, 0, buffer.Length); // Read a chunk from the file the size of the buffer
if (bytesRead == 0)
{
break;
}
// Get partial upload URL
var uploadUrl = GetPresignedPartialURL(baseUrl, token, databaseName, fileExtension, sqlInstanceName, uploadId, partNumber);
// Upload chunk to URL
etags.Add(partNumber.ToString(), UploadByteArray(buffer, uploadUrl));
partNumber++;
}
CompleteUpload(baseUrl, token, databaseName, fileExtension, sqlInstanceName, uploadId, etags);
}
private static string GetPresignedPartialURL(string baseUrl, string token, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, int partNumber)
{
var request = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}");
request.Method = "GET";
request.Headers.Add("Authorization", token);
var response = (HttpWebResponse)request.GetResponse();
var uploadUrl = "";
using (var streamReader = new StreamReader(response.GetResponseStream()))
{
var content = streamReader.ReadToEnd();
uploadUrl = content.ToString();
}
return uploadUrl;
}
private static string UploadByteArray(byte[] buffer, string uploadUrl)
{
var uploadRequest = (HttpWebRequest)WebRequest.Create(uploadUrl);
uploadRequest.Method = "PUT";
uploadRequest.ContentType = "application/octet-stream";
uploadRequest.ContentLength = buffer.Length;
using (var dataStream = uploadRequest.GetRequestStream())
{
dataStream.Write(buffer, 0, buffer.Length);
}
var uploadResponse = (HttpWebResponse)uploadRequest.GetResponse();
if (uploadResponse.StatusCode != HttpStatusCode.OK)
{
throw new Exception("Unable to upload chunk");
}
return uploadResponse.Headers["ETag"].Replace("\"", "");
}
private static void CompleteUpload(string baseUrl, string token, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, Dictionary<string, string> etags)
{
var completeRequest = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/complete-upload");
completeRequest.Method = "POST";
completeRequest.Headers.Add("Authorization", token);
using (var streamWriter = new StreamWriter(completeRequest.GetRequestStream()))
{
var json = string.Format("{{\"uploadId\": \"{0}\" , \"etags\": {1}}}", uploadId, JsonConvert.SerializeObject(etags));
streamWriter.Write(json);
}
var completeResponse = (HttpWebResponse)completeRequest.GetResponse();
if (completeResponse.StatusCode != HttpStatusCode.OK)
{
throw new Exception("didn't work");
}
}
private static string GenerateUploadId(string baseUrl, string token, string sqlInstance, string fileExtension, string databaseName)
{
var request = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances/{sqlInstance}/databases/{databaseName}/{fileExtension}/init-upload");
request.Method = "POST";
request.Headers.Add("Authorization", token);
var response = (HttpWebResponse)request.GetResponse();
using (var streamReader = new StreamReader(response.GetResponseStream()))
{
var content = streamReader.ReadToEnd();
var jsonResponse = JObject.Parse(content);
return jsonResponse["uploadId"].ToString();
}
}
private static string GetSqlInstance(string baseUrl, string token)
{
var request = (HttpWebRequest)WebRequest.Create($"{baseUrl}/databridge/v1/sql-instances");
request.Method = "GET";
request.Headers.Add("Authorization", token);
var response = (HttpWebResponse)request.GetResponse();
using (var streamReader = new StreamReader(response.GetResponseStream()))
{
var content = streamReader.ReadToEnd();
var jsonResponse = JArray.Parse(content);
return jsonResponse.First()["name"].ToString();
}
}
}
}
### Complete Python example
All code samples are written in Python 3.9. The tutorial requires one external dependency, the requests library which can be pip installed.
import requests
def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id):
etags = {}
with open(local_file_path, 'rb') as db_file:
chunk = db_file.read(chunk_size_bytes)
part_number = 1
while chunk:
url = get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number)
etag = upload_chunk_to_url(url, chunk)
etags[str(part_number)] = etag
chunk = db_file.read(chunk_size_bytes)
part_number = part_number + 1
complete_url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/complete-upload"
payload = {
"uploadId": upload_id,
"etags": etags
}
response = requests.post(complete_url, headers=headers, json=payload)
response.raise_for_status()
def upload_chunk_to_url(url, chunk):
headers = {
"Content-Type": "application/octet-stream"
}
response = requests.put(url, headers=headers, data=chunk)
response.raise_for_status()
return response.headers["Etag"].strip('"')
def get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number):
url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/upload-part/{upload_id}/{part_number}"
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.text
def get_sql_instance(base_url, headers):
url = f"{base_url}/databridge/v1/sql-instances"
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()[0]["name"]
def generate_upload_id(base_url, headers, database_name, file_extension, sql_instance):
url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/init-upload"
response = requests.post(url, headers=headers)
response.raise_for_status()
return response.json()["uploadId"]
def get_auth_header(auth_key):
headers = {
"Authorization": auth_key
}
return headers
if name == "main":
baseurl = "https://api-euw1.rms.com"
api_key = "**"
database_name = "my_python_edm"
local_path = "my_python_edm.mdf"
file_extension = "mdf"
chunk_size = 20 1024 * 1024
auth_header = get_auth_header(api_key)
sql_instance = get_sql_instance(base_url, auth_header)
upload_id = generate_upload_id(base_url, auth_header, database_name, file_extension, sql_instance)
upload_files_using_multi_part_upload(base_url, auth_header, database_name, local_path, file_extension, chunk_size, sql
Updated 4 months ago