Import Large Databases

Import large databases to Data Bridge

Overview

The Data Bridge Import/Upload API defines operations that enable Moody's RMS customers to upload and import large databases (files greater than 5GB in size) to the cloud in multiple parts.

This process leverages Amazon Web Services (AWS) APIs to enable tenants to upload database artifacts (in BAK, DACPAC, or MDF format) to storage buckets on AWS in multiple parts. Once uploaded to the cloud, the tenant can use the Data Bridge Jobs API to import the database on a managed server instance on Data Bridge.

Moody's RMS recommends that you use multi-part upload for all data integration projects. Although this workflow is more complicated than a single-stream upload workflow, multi-part upload jobs provide for greater throughput and better recovery from network issues.

📷

Upload Optimizations

This tutorial does not discuss other optimizations that you should consider before implementing this process in production such as multi-threaded streaming, which can increase the throughput of uploads and failure recovery and exception handling."

In the following sections, we illustrate this recipe using the C# and Python programming languages.

Step 1: Identify server instance

Use the Get server instances operation (/databridge/v1/sql-instances) to request information about the managed server instances available on Data Bridge. The operation takes no parameters.

As with all requests that use Data Bridge API operations, you must specify a host in the operation URI and provide a valid API key with the request. The {host} identifies the environment hosting your Data Bridge instance (one of api-euw1.rms.com or api-use1.rms.com) and the apiKey specifies an API key token. For details, see Authentication and Authorization.

In the example, we assume that the tenant has a single managed server instance.

public static async Task Main(string[] args 
{
    using(HttpClient client = new HttpClient()) 
    {
        string baseUrl = "https://api-euw1.rms.com";
        string apiKey = "<<YOUR API KEY HERE>>";
        client.BaseAddress = new Uri(baseUrl);

        //Set the auth API key
        client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue(apiKey);
        var sqlInstance = await GetFirstSqlInstance(client);
    }
}

private static async Task < string > GetFirstSqlInstance(HttpClient client) 
{
    using(HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, "databridge/v1/sql-instances")) 
    {
        request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
        using(HttpResponseMessage response = await client.SendAsync(request)) 
        {
            if (response.IsSuccessStatusCode) 
            {
                    using(HttpContent content = response.Content) 
                {
                    var jsonString = await content.ReadAsStringAsync();
                    var jsonResponse = JArray.Parse(jsonString);
                    return jsonResponse[0]["name"].ToString();
                }
            }
            throw new Exception("Unable to get SQL insance names. HTTP status Code:" +
                    response.StatusCode);
        }
    }
}
import requests
import time

def get_auth_header(auth_key):
    headers = {
        "Authorization": auth_key
    }
    return headers

def get_sql_instance(base_url, headers):
    url = f"{base_url}/databridge/v1/sql-instances"
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.json()[0]["name"]

if __name__ == "__main__":
    base_url = "https://api-euw1.rms.com"
    api_key = "xxx"
    auth_header = get_auth_header(api_key)
    sql_instance = get_sql_instance(base_url, auth_header)

The operation returns information about each managed server instance including the name, uid, and endpoint attributes in an array. Use the value of the name attribute to identify a server instance in Step 2.

Step 2: Initiate multi-part upload job

Once you have the name your server instance, you can use the Initiate multipart upload operation (POST /v1/sql-instances/{instanceName}/databases/{databaseName}/{fileExtension}/init-upload) to initiate a multi-part upload job.

The operation takes three path parameters: instanceName, databaseName, and fileExtension:

  • The instanceName parameter specifies the name of a server instance. Use the value returned in Step 1.
  • The databaseName parameter specifies the name that will be given to the new database on the server instance.
  • The fileextension parameter identifies the file type of the uploaded database artifact. One of mdf, bak or dacpac.

In the example, we create a multi-part upload job to transfer an MDF database artifact file to the cloud and to create a new database named my_edm on our server instance.


public static async Task Main(string[] args 
{
    using(HttpClient client = new HttpClient()) 
    {
        // Previous steps omitted here
        var edmName = "my_edm";
        var fileExtensionLiteral = "edm";
        var uploadId = await GenerateUploadId(client, sqlInstance, fileExtensionLiteral, edmName);
    }
}
private static async Task < string > GenerateUploadId(HttpClient client, string sqlInstance,
      string fileExtension, string databaseName) 
{
    using(HttpResponseMessage response =
        await client.PostAsync($"databridge/v1/sql-instances/{sqlInstance}/databases/{databaseName}/{fileExtension}/init-upload", null)) 
    {
        if (response.IsSuccessStatusCode) 
        {
            using(HttpContent content = response.Content) 
            {
                var jsonString = await content.ReadAsStringAsync();
                var jsonResponse = JObject.Parse(jsonString);
                return jsonResponse["uploadId"].ToString();
            }
        }
        throw new Exception("Unable to get upload ID. HTTP status Code:" + response.StatusCode);
    }
}
def generate_upload_id(base_url, headers, database_name, file_extension, sql_instance):
    url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/init-upload"
    response = requests.post(url, headers=headers)
    response.raise_for_status()
    return response.json()["uploadId"]
if __name__ == "__main__":
    # Previous steps omitted here
    database_name = "my_python_edm"
    file_extension = "mdf"
    upload_id = generate_upload_id(base_url, auth_header, database_name, file_extension, sql_instance)

If successful, the operation constructs a request for uploading the database artifact to Data Bridge and returns an upload job ID.

Step 3: Upload file chunks to storage buckets

Now that you have an upload ID for the multi-part upload job, you can break the database artifact file into discreet chunks of data, acquire a pre-signed URI to an storage bucket on AWS, and transfer chunks of data to that storage bucket.

  1. First, we read the database artifact from a local directory and break the file data into discreet chunks. Each chunk is identified by a unique partNumber. We iterate over the data and write the parts to a buffer byte array. If the database artifact file is 100 MB in size and you break it into 20 MB chunks, you can expect the loop to run five times.
  2. Use the Get pre-signed URI for multipart upload operation (GET /v1/sql-instances/{instanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}) operation to retrieve a pre-signed URI for uploading data to AWS. The pre-signed URI provides temporary access to an storage bucket. Data is transferred as an octet stream to the pre-signed URI.
  3. Once you have the pre-signed URI, use the Upload data part by number operation (PUT /v1/sql-instances/{instanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}) to upload each chunk of the database artifact.

Responses from the operation are caught in the etags dictionary. We collect etags from each part of the multi-part upload job. We will use the etags in Step 4.

In C#, we break the file into chunks using a FileStream and read specified chunks of the file into the buffer before uploading the chunk.

public static async Task Main(string[] args
{
    using (HttpClient client = new HttpClient())
    {
        // Previous steps omitted here
        var localFilePath = "my_edm.mdf";
        var bufferSizeInBytes = 20 * 1024 * 1024;
        string fileExtensionLiteral = "mdf";
        var etags = await UploadFilesUsingMultiPartUpload(client, edmName, localFilePath, 
          fileExtensionLiteral, bufferSizeInBytes, sqlInstance, uploadId);       
    }
}
private static async Task<Dictionary<string, string>> 
        UploadFilesUsingMultiPartUpload(HttpClient client,
        string databaseName, string localFilePath, string fileExtension, 
        int bufferSizeInBytes, string sqlInstanceName, string uploadId)
    {
        using (FileStream fileStream = File.OpenRead(localFilePath))
        {
            var partNumber = 1;
            var etags = new Dictionary<string, string>();
            while (true)
            {
                var buffer = new byte[bufferSizeInBytes];
                // Read a chunk from the file the size of the buffer
                var bytesRead = fileStream.Read(buffer, 0, buffer.Length); 
                if (bytesRead == 0)
                {
                    break;
                }
                // Get partial upload URL
                var uploadUrl = await GetPresignedPartialURL(client, databaseName,
                    fileExtension, sqlInstanceName, uploadId, partNumber);
                // Upload chunk to URL                 
                etags.Add(partNumber.ToString(), 
                    await UploadByteArray(buffer, bytesRead, uploadUrl)); partNumber++;
            }
            return etags;
        }
    }
private async static Task<string> GetPresignedPartialURL(HttpClient client, string
        databaseName, string fileExtension, string sqlInstanceName, string uploadId, 
        int partNumber)
    {          
        using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get,
            $”databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}”))
            {
                request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue(“application/json”));                
                using (HttpResponseMessage response = await client.SendAsync(request))
                {
                    using (HttpContent content = response.Content)
                    {
                        var url = await content.ReadAsStringAsync();
                        return url.Trim(‘”’);
                    }
                }
            }
    }
private async static Task<string> UploadByteArray(byte[] buffer, int length, 
        string uploadUrl)
    {
        using (HttpClient client = new HttpClient()) 
        {
            using (ByteArrayContent content = new ByteArrayContent(buffer, 0, length))
            {
                content.Headers.Add(“Content-Type”, “application/octet-stream”);
                using (HttpResponseMessage response = 
                    await client.PutAsync(uploadUrl, content))
                {
                    if (response.IsSuccessStatusCode)
                    {                            
                        return response.Headers.Etag.Tag.Trim(‘”’);                            
                    }
                    throw new Exception(“Unable to upload chunk. HTTP status Code:” +
response.StatusCode);
                }
            }
        }
    }
def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id):
    etags = {}
    with open(local_file_path, ‘rb’) as db_file:
        chunk = db_file.read(chunk_size_bytes)
        part_number = 1        
        while chunk:        
            url = get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number)
            etag = upload_chunk_to_url(url, chunk)
            etags[str(part_number)] = etag
            chunk = db_file.read(chunk_size_bytes)
            part_number = part_number + 1   
def upload_chunk_to_url(url, chunk):
    headers = {
        “Content-Type”: “application/octet-stream”
    }
    response = requests.put(url, headers=headers, data=chunk)
    response.raise_for_status()
    return response.headers[“Etag”].strip(‘\”’)
def get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number):
    url = f”{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/upload-part/{upload_id}/{part_number}”
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.text
if __name__ == “__main__”:
    # Previous steps omitted here
    chunk_size = 20 * 1024 * 1024
    local_path = “my_python_edm.mdf”
    upload_files_using_multi_part_upload(base_url, auth_header, database_name, local_path, file_extension, chunk_size, sql_instance, upload_id)

For each chunk uploaded to the Amazon storage bucket, the operation returns an uploadId and etags Dictionary.

Step 4: Complete file upload

Now that the entire file has been uploaded to the Amazon storage bucket, use the Complete multipart upload operation (POST /databridge/v1/sql-instances/{instanceName}/databases/{databaseName}/{fileExtension}/complete-upload) to complete the multi-part upload process.

As before the operation requires the instanceName, databaseName, and fileExtension path parameters. We add the uploadid and etags that we retrieved from the upload chunks and send them as part of the payload.

public static async Task Main(string[] args
{
    using (HttpClient client = new HttpClient())
    {
        // Previous steps omitted here
        await CompleteUpload(client, databaseName, fileExtension, sqlInstanceName, 
            uploadId, etags);
        await AttachDatabase(client, sqlInstance, edmName, /*MDF = 0, BAK = 1*/0);
    }
}
private static async Task CompleteUpload(HttpClient client, string databaseName, 
    string fileExtension, string sqlInstanceName, string uploadId, 
    Dictionary<string, string> etags)
    {            
        using (HttpContent payload = new StringContent(
            string.Format("{{\"uploadId\": \"{0}\" , \"etags\": {1}}}", uploadId, 
            JsonConvert.SerializeObject(etags)), Encoding.UTF8, "application/json"))                
        {
            using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/complete-upload", payload))
                {
                    if (!response.IsSuccessStatusCode)
                    {
                        throw new Exception("Unable to complete upload. HTTP status Code:" + response.StatusCode);
                    }
                }
        }            
    }
private static async Task AttachDatabase(HttpClient client,string instanceName, string edmName, int fileFormat)
    {
        using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{instanceName}/Databases/{edmName}/import?importFrom={fileFormat}", null))
            {
                if (response.IsSuccessStatusCode)
                {                    
                    using (HttpContent content = response.Content)
                    {
                        var jsonString = await content.ReadAsStringAsync();
                        var jsonResponse = JObject.Parse(jsonString);
                        string jobId = jsonResponse["jobId"].ToString();
                        // poll until job is complete
                        await PollJobStatus(client, jobId);
                    }
                }
            }
    }
private static async Task PollJobStatus(HttpClient client, string jobId, 
    int sleepIntervalInMilliseconds = 5000, int maxWaitTimeInMilliseonds = 300000)
{
    string status;
    int totalWaitTime = 0;
    while (totalWaitTime < maxWaitTimeInMilliseonds)
    {
       // Query Job API
       using (HttpResponseMessage response = 
        await client.GetAsync($"databridge/v1/Jobs/{jobId}"))
        {
       	    using (HttpContent content = response.Content)
            {
                status = await content.ReadAsStringAsync();
                if (status == "Succeeded")
                {
                    break;
                }
                Thread.Sleep(sleepIntervalInMilliseconds);
                totalWaitTime += sleepIntervalInMilliseconds;
            }
        }                
    }
}
def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id):
    # Previous steps omitted
    complete_url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/complete-upload"
    payload = {
        "uploadId": upload_id,
        "etags": etags
    }
    response = requests.post(complete_url, headers=headers, json=payload)
    response.raise_for_status()
def attach_database(base_url,headers, sql_instance, database_name, fileType):
    url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/Databases/{database_name}/import?importFrom={fileType}"
    response = requests.post(url, headers=headers)
    response.raise_for_status()
    jobId = response.json()["jobId"]
    status = 'InProgress'
    totalWaitTime = 0
    while(totalWaitTime < 300): 
        response = requests.get(f"{base_url}/databridge/v1/Jobs/{jobId}", headers=headers)
        status = response.text
        if status != 'Succeeded':
            time.sleep(5)
            totalWaitTime += 5
        else:
            break

Complete code examples

Complete C# example

This tutorial has been created using .NET Core 3.1 and should work for .NET 5. The only external dependency that we’re using in this tutorial is Newtonsoft.json, which can be added as a Nuget dependency.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System.Threading.Tasks;
using System.Net.Http.Headers;
using System.Globalization;
using System.Threading;
using System.Text;

namespace UploadSample
{
    public class MultiPartUploadProgram
    {
        public enum FileType
        {
            MDF = 0,
            BAK = 1,
            DACPAC = 2
        }
        public static async Task Main(string[] args)
        {
            using (HttpClient client = new HttpClient())
            {
                FileType fileExtension;                
                string baseUrl = "https://api-euw1.rms.com";
                string apiKey = args[0];
                var edmName = args[1];
                var localFilePath = args[2];
                // overrride baseURL if defined
                if (args.Length > 3)
                {
                    baseUrl = args[3];
                }
                client.BaseAddress = new Uri(baseUrl);
                var bufferSizeInBytes = 20 * 1024 * 1024;
                if (localFilePath.EndsWith("mdf", true, CultureInfo.InvariantCulture))
                {                    
                    fileExtension= FileType.MDF;
                }
                else if (localFilePath.EndsWith("bak", true, CultureInfo.InvariantCulture))
                {                    
                    fileExtension= FileType.BAK;
                }
                else
                {
                    Console.WriteLine("Invalid File extension. Supported extensions are .mdf and .bak ");
                    return;
                }
                string fileExtensionLiteral = Enum.GetName(typeof(FileType), fileExtension).ToLowerInvariant();
                //set the auth API key
                client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue(apiKey);
                var sqlInstance = await GetSqlInstance(client);
                var uploadId = await GenerateUploadId(client, sqlInstance, fileExtensionLiteral, edmName);
                var etags = await UploadFilesUsingMultiPartUpload(client, edmName, localFilePath, fileExtensionLiteral, bufferSizeInBytes, sqlInstance, uploadId);
                await CompleteUpload(client, edmName, fileExtensionLiteral, sqlInstance, uploadId, etags);
                await AttachDatabase(client, sqlInstance, edmName, fileExtension);
            }
        }
        private static async Task AttachDatabase(HttpClient client,string instanceName, string edmName, FileType fileFormat)
        {
            int fileExtensionValue = (int)fileFormat;
            using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{instanceName}/Databases/{edmName}/import?importFrom={fileExtensionValue}", null))
            {
                if (response.IsSuccessStatusCode)
                {                    
                    using (HttpContent content = response.Content)
                    {
                        var jsonString = await content.ReadAsStringAsync();
                        var jsonResponse = JObject.Parse(jsonString);
                        string jobId = jsonResponse["jobId"].ToString();
                        // poll until job is complete
                        await PollJobStatus(client, jobId);
                    }
                }
            }
        }
        private static async Task PollJobStatus(HttpClient client, string jobId, int sleepIntervalInMilliseconds = 5000, int maxWaitTimeInMilliseonds = 300000)
        {
            string status;
            int totalWaitTime = 0;
            while (totalWaitTime < maxWaitTimeInMilliseonds)
            {
                // Query Job API
                using (HttpResponseMessage response = await client.GetAsync($"databridge/v1/Jobs/{jobId}"))
                {
                    using (HttpContent content = response.Content)
                    {
                        status = await content.ReadAsStringAsync();
                        if (status == "Succeeded")
                        {
                            break;
                        }
                        Thread.Sleep(sleepIntervalInMilliseconds);
                        totalWaitTime += sleepIntervalInMilliseconds;
                    }
                }                
            }
        }
        private static async Task<Dictionary<string, string>> UploadFilesUsingMultiPartUpload(HttpClient client, string databaseName, string localFilePath, string fileExtension, int bufferSizeInBytes, string sqlInstanceName, string uploadId)
        {
            using (FileStream fileStream = File.OpenRead(localFilePath))
            {
                var partNumber = 1;
                var etags = new Dictionary<string, string>();
                while (true)
                {
                    var buffer = new byte[bufferSizeInBytes];
                    var bytesRead = fileStream.Read(buffer, 0, buffer.Length); // Read a chunk from the file the size of the buffer
                    if (bytesRead == 0)
                    {
                        break;
                    }
                    // Get partial upload URL
                    var uploadUrl = await GetPresignedPartialURL(client, databaseName, fileExtension, sqlInstanceName, uploadId, partNumber);
                    // Upload chunk to URL                 
                    etags.Add(partNumber.ToString(), await UploadByteArray(buffer, bytesRead, uploadUrl));
                    partNumber++;
                }
                return etags;
            }
        }
        private async static Task<string> GetPresignedPartialURL(HttpClient client, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, int partNumber)
        {          
            using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, $"databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}"))
            {
                request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));                
                using (HttpResponseMessage response = await client.SendAsync(request))
                {
                    using (HttpContent content = response.Content)
                    {
                        var url = await content.ReadAsStringAsync();
                        return url.Trim('"');
                    }
                }
            }
        }
        private async static Task<string> UploadByteArray(byte[] buffer, int length, string uploadUrl)
        {
            using (HttpClient client = new HttpClient()) 
            {
                using (ByteArrayContent content = new ByteArrayContent(buffer, 0, length))
                {
                    content.Headers.Add("Content-Type", "application/octet-stream");
                    using (HttpResponseMessage response = await client.PutAsync(uploadUrl, content))
                    {
                        if (response.IsSuccessStatusCode)
                        {                            
                            return response.Headers.ETag.Tag.Trim('"');                            
                        }
                        throw new Exception("Unable to upload chunk. HTTP status Code:" + response.StatusCode);
                    }
                }
            }
        }
        private static async Task CompleteUpload(HttpClient client, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, Dictionary<string, string> etags)
        {            
            using (HttpContent payload = new StringContent(string.Format("{{\"uploadId\": \"{0}\" , \"etags\": {1}}}", uploadId, JsonConvert.SerializeObject(etags)), Encoding.UTF8, "application/json"))                
            {
                using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/complete-upload", payload))
                {
                    if (!response.IsSuccessStatusCode)
                    {
                        throw new Exception("Unable to complete upload. HTTP status Code:" + response.StatusCode);
                    }
                }
            }            
        }
        private static async Task<string> GenerateUploadId(HttpClient client,  string sqlInstance, string fileExtension, string databaseName)
        {            
            using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{sqlInstance}/databases/{databaseName}/{fileExtension}/init-upload", null))
            {
                if (response.IsSuccessStatusCode)
                {
                    using (HttpContent content = response.Content)
                    {
                        var jsonString = await content.ReadAsStringAsync();
                        var jsonResponse = JObject.Parse(jsonString);
                        return jsonResponse["uploadId"].ToString();
                    }
                }
                throw new Exception("Unable to get upload ID. HTTP status Code:" + response.StatusCode);
            }
        }
        private static async Task<string> GetSqlInstance(HttpClient client)
        {
            using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, "databridge/v1/sql-instances"))
            {
                request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
                using (HttpResponseMessage response = await client.SendAsync(request))
                {
                    if (response.IsSuccessStatusCode)
                    {
                        using (HttpContent content = response.Content)
                        {
                            var jsonString = await content.ReadAsStringAsync();
                            var jsonResponse = JArray.Parse(jsonString);
                            return jsonResponse[0]["name"].ToString();
                        }
                    }
                    throw new Exception("Unable to get SQL insance names. HTTP status Code:" + response.StatusCode);
                }
            }
        }
    }
}

Complete Python example

All code samples are written in Python 3.9. The tutorial requires one external dependency, the requests library which can be pip installed.

import requests
import time

def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id):
    etags = {}
    with open(local_file_path, 'rb') as db_file:
        chunk = db_file.read(chunk_size_bytes)
        part_number = 1        
        while chunk:        
            url = get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number)
            etag = upload_chunk_to_url(url, chunk)
            etags[str(part_number)] = etag
            chunk = db_file.read(chunk_size_bytes)
            part_number = part_number + 1
    complete_url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/complete-upload"
    payload = {
        "uploadId": upload_id,
        "etags": etags
    }
    response = requests.post(complete_url, headers=headers, json=payload)
    response.raise_for_status()
def upload_chunk_to_url(url, chunk):
    headers = {
        "Content-Type": "application/octet-stream"
    }
    response = requests.put(url, headers=headers, data=chunk)
    response.raise_for_status()
    return response.headers["Etag"].strip('\"')
def get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number):
    url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/upload-part/{upload_id}/{part_number}"
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.text
def get_sql_instance(base_url, headers):
    url = f"{base_url}/databridge/v1/sql-instances"
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.json()[0]["name"]
def generate_upload_id(base_url, headers, database_name, file_extension, sql_instance):
    url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/init-upload"
    response = requests.post(url, headers=headers)
    response.raise_for_status()
    return response.json()["uploadId"]
def attach_database(base_url,headers, sql_instance, database_name, fileType):
    url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/Databases/{database_name}/import?importFrom={fileType}"
    response = requests.post(url, headers=headers)
    response.raise_for_status()
    jobId = response.json()["jobId"]
    status = 'InProgress'
    totalWaitTime = 0
    while(totalWaitTime < 300): 
        response = requests.get(f"{base_url}/databridge/v1/Jobs/{jobId}", headers=headers)
        status = response.text
        if status != 'Succeeded':
            time.sleep(5)
            totalWaitTime += 5
        else:
            break
def get_auth_header(auth_key):
    headers = {
        "Authorization": auth_key
    }
    return headers
if __name__ == "__main__":
    base_url = "https://api-euw1.rms.com"
    api_key = "***"
    database_name = "my_python_edm"
    local_path = "my_python_edm.mdf"
    file_extension = "mdf"
    chunk_size = 20 * 1024 * 1024
    auth_header = get_auth_header(api_key)
    sql_instance = get_sql_instance(base_url, auth_header)
    upload_id = generate_upload_id(base_url, auth_header, database_name, file_extension, sql_instance)
    upload_files_using_multi_part_upload(base_url, auth_header, database_name, local_path, file_extension, chunk_size, sql_instance, upload_id)
    attach_database(base_url, auth_header, sql_instance, database_name, 0)