Working with REST APIs

Globally distributed applications need a geo distributed fast data platform that can transparently replicate the data anywhere in the world to enable the applications to operate on a copy of the data that's close to its users. Similarly the applications need geo-replicated and local streams to handle pub-sub, ETL and real-time updates from the fast data platform.

Macrometa GDN is a geo-distributed realtime data service with turnkey global distribution and transparent multi-master replication. You can run globally distributed, low-latency workloads with GDN. This article is an introduction to using GDN via its REST APIs.

Note

If you are new to Macrometa GDN, we strongly recommend reading Essentials of Macrometa GDN.

Pre-Requiste

A tenant account (and credentials) with Macrometa GDN.

API Browser

Your best friend when working with REST APIs is the REST API browser available in GDN GUI. From there, you can execute various rest apis and see exactly what the inputs and outputs are.

GDN API Browser

Working with Documents

A document is a dictionary/object that is JSON serializable with the following properties:

  • Contains the _key field, which identifies the document uniquely within a specific collection.
  • Contains the _id field (also called the handle), which identifies the document uniquely across all collections within a fabric. This ID is a combination of the collection name and the document key using the format {collection}/{key} (see example below).
  • Contains the _rev field. GDN supports MVCC (Multiple Version Concurrency Control) and is capable of storing each document in multiple revisions. Latest revision of a document is indicated by this field. The field is populated by GDN and is not required as input unless you want to validate a document against its current revision.

Here is an example of a valid document:

    {
        '_id': 'students/bruce',
        '_key': 'bruce',
        '_rev': '_Wm3dzEi--_',
        'first_name': 'Bruce',
        'last_name': 'Wayne',
        'address': {
            'street' : '1007 Mountain Dr.',
            'city': 'Gotham',
            'state': 'NJ'
        },
        'is_rich': True,
        'friends': ['robin', 'gordon']
    }

Constants

Let's define following constants which will be used throughout the tutorial.

# Constants

FEDERATION = "api-gdn1.macrometa.io"
FED_URL = "https://{}".format(FEDERATION)
EMAIL = "guest@macrometa.io"
PASSWORD = "guest"
FABRIC = "_system"
COLLECTION_NAME = "testcollection"
SUBSCRIPTION_NAME = "my-sub"
AUTH_TOKEN = "bearer "

Create a HTTPs session

# Create a HTTPS Session

url = "{}/_open/auth".format(FED_URL)
payload = {
    'email':EMAIL,
    'password':PASSWORD
    }
headers = {
    'content-type': 'application/json'
    }

response = requests.post(url, data = json.dumps(payload), headers = headers)

if response.status_code == 200:
    resp_body = json.loads(response.text)
    AUTH_TOKEN += resp_body["jwt"]
    TENANT = resp_body["tenant"]
else:
    raise Exception("Error while getting auth token. Code:{}, Reason:{}".format(response.status_code,response.reason))


session = requests.session()
session.headers.update({"content-type": 'application/json'})
session.headers.update({"authorization": AUTH_TOKEN})

Get list of all Regions

# Get List of all regions

url = FED_URL + "/_api/datacenter/all"
dcl_resp = session.get(url)
dcl_list = json.loads(dcl_resp.text)
regions = []
for dcl in dcl_list:
    dcl_url = dcl['tags']['url']
    regions.append(dcl_url)
print(regions)

Create Document Collection

Create a test collection. Collection type = 2 for documents. Collection type = 3 for edges.

# Create a document collection
url = FED_URL + "/_api/collection"
payload = {
     "name": COLLECTION_NAME,
    "type": 2
}
resp = session.post(url, data = json.dumps(payload))
print(resp.text)

Using C8QL

#Using C8QL

url = FED_URL + "/_api/cursor"

# Insert documents to the collection
resp = session.post(url, json={
    "query": "INSERT{'name' : 'Julie', 'company' : 'ABC', '_key' : 'Julie'}" \
            "INTO testcollection"
})

# Read from the collection
resp = session.post(url, json={
    "query": "FOR doc IN testcollection RETURN doc" 
})

# Update documents in the collection
resp = session.post(url, json={
    "query": "FOR c IN testcollection UPDATE {'company':'XYZ'} IN testcollection"
})

# Delete documents in the collection
resp = session.post(url, json={
    "query": "FOR c IN testcollection REMOVE c IN testcollection"
})

Summary

Below is the complete program if you want to play around with it.


import json
import requests

# Constants
FEDERATION = "api-gdn1.macrometa.io"
FED_URL = "https://{}".format(FEDERATION)
EMAIL = "guest@macrometa.io"
PASSWORD = "guest"
FABRIC = "_system"
COLLECTION_NAME = "testcollection"
SUBSCRIPTION_NAME = "my-sub"
AUTH_TOKEN = "bearer "

# Create a HTTPS Session

url = "{}/_open/auth".format(FED_URL)
payload = {
    'email':EMAIL,
    'password':PASSWORD
    }
headers = {
    'content-type': 'application/json'
    }

response = requests.post(url, data = json.dumps(payload), headers = headers)

if response.status_code == 200:
    resp_body = json.loads(response.text)
    AUTH_TOKEN += resp_body["jwt"]
    TENANT = resp_body["tenant"]
else:
    raise Exception("Error while getting auth token. Code:{}, Reason:{}".format(response.status_code,response.reason))

session = requests.session()
session.headers.update({"content-type": 'application/json'})
session.headers.update({"authorization": AUTH_TOKEN})

# Create a test collection. collection type - 2 for document, 3 for edge

url = FED_URL + "/_api/collection"
payload = {
     "name": COLLECTION_NAME,
    "type": 2
}
resp = session.post(url, data = json.dumps(payload))
print(resp.text)

# Using C8QL do CRUD operations on the collection.

url = FED_URL + "/_api/cursor"

# Insert documents to the collection
resp = session.post(url, json={
    "query": "INSERT{'name' : 'Julie', 'company' : 'ABC', '_key' : 'Julie'}" \
            "INTO testcollection"
})

# Read from the collection
resp = session.post(url, json={
    "query": "FOR doc IN testcollection RETURN doc" 
})

# Update documents in the collection
resp = session.post(url, json={
    "query": "FOR c IN testcollection UPDATE {'company':'XYZ'} IN testcollection"
})

# Delete documents in the collection
resp = session.post(url, json={
    "query": "FOR c IN testcollection REMOVE c IN testcollection"
})


# Create User

# Note :- To use this api you need to have users enabled from feature gates

url = FED_URL + "/_fabric/_system/_api/user"
user = session.post(url=url, json={
    "active": True,
    "passwd": "guest123",
    "user": "guest@test.com"
})
print(user.text)


# Create Fabric
# Note :- To use this api you need to have fabrics enabled from feature gates

url = FED_URL + "/_api/database"
db = session.post(url, json={
    "name": "testfabric",
    "options": {
        "dcList": regions,
        "realTime": True
    },
    "users": [
        {
            "active": True,
            "extra": {},
            "passwd": "",
            "username": ""
        }
    ]
})

print(db.text)

Working with Graphs

Edge documents (edges) are similar to standard documents but with two additional required fields _from and _to. Values of these fields must be the handles of "from" and "to" vertex documents linked by the edge document in question. Here is an example of a valid edge document:

    {
        '_id': 'friends/001',
        '_key': '001',
        '_rev': '_Wm3dyle--_',
        '_from': 'students/john',
        '_to': 'students/jane',
        'closeness': 9.5
    }

A Graph consists of vertices and edges. Edges are stored as documents in edge collections. A vertex can be a document of a document collection or of an edge collection (so edges can be used as vertices). Which collections are used within a named graph is defined via edge definitions. A named graph can contain more than one edge definition, at least one is needed. Graphs allow you to structure your models in line with your domain and group them logically in collections and giving you the power to query them in the same graph queries.

In SQL you commonly have the construct of a relation table to store n:m relations between two data tables. An edge collection is somewhat similar to these relation tables. Vertex collections resemble the data tables with the objects to connect.

While simple graph queries with fixed number of hops via the relation table may be doable in SQL with several nested joins, graph databases can handle an arbitrary number of these hops over edge collections - this is called traversal. Also edges in one edge collection may point to several vertex collections. Its common to have attributes attached to edges, i.e. a label naming this interconnection.

Edges have a direction, with their relations _from and _to pointing from one document to another document stored in vertex collections. In queries you can define in which directions the edge relations may be followed i.e.,

  • OUTBOUND: _from_to
  • INBOUND: _from_to
  • ANY: _from_to.

Constants

Let's define following constants which will be used throughout the tutorial.

FEDERATION_URL = "https://api-gdn1.macrometa.io"

TENANT_NAME = "guest@macrometa.io"
TENANT_USER = "root"
TENANT_PWD = "guest"
GEO_FABRIC =  "_system"

COLLECTION_NAME = "employees"
GRAPH_NAME = "my-graph"

Generate JWT Token

url = "%s/_open/auth" % region
payload = {"password": password, "email": tenant_email}
response = requests.post(url, json=payload)
if response.status_code not in range(200, 210):
    print("URL: %s\nPayload: %s\nMethod: POST" % (url, payload))
    raise RuntimeError("getting token failed with status_code: %s",
                        response.status_code)
json_data = json.loads(response.text)
token = json_data["jwt"]

Authorize The Session

Now, once we retrieve JWT token using /_open/auth endpoint, we can add this JWT token to the session and use this session throughout.

Below are the steps to create session

session = requests.Session()
session.headers.update({"Authorization":"Bearer "+token})

Create Document Collection

Create collection using endpoint /_fabric/{fabric_name}/_api/collection

The below example shows the steps.

payload = { 'name': COLLECTION_NAME }

url = FEDERATION_URL + "/_fabric/{}/_api/collection".format(GEO_FABRIC)
resp = session.post(url,data=json.dumps(payload))
result = json.loads(resp.text)
print(result)

Create Edge Collection

An edge collection contains edge documents and shares its namespace with all other types of collections. You can manage edge documents via standard collection API wrappers, but using edge collection API wrappers provides additional safeguards:

  • All modifications are executed in transactions.
  • Edge documents are checked against the edge definitions on insert.

To create edge collection use same endpoint /_fabric/{fabric_name}/_api/collection and pass type:2 in payload.

Steps to create edge collection are shown belowe:

payload = { 'name': COLLECTION_NAME, "type":2 }

url = FEDERATION_URL + "/_fabric/{}/_api/collection".format(GEO_FABRIC)
resp = session.post(url,data=json.dumps(payload))
result = json.loads(resp.text)
print(result)

You can manage edges via graph API wrappers also, but you must use document IDs instead of keys where applicable.

Insert Documents

Let's insert documents to the employees collection using endpoint POST /_fabric/{fabric_name}/_api/document/{collection_name} as shown below.

payload = [
    {
        '_key':'Jean',
        'firstname': 'Jean',
        'lastname':'Picard',
        'email':'jean.picard@macrometa.io'
    },
    {
        '_key':'James',
        'firstname': 'James',
        'lastname':'Kirk',
        'email':'james.kirk@macrometa.io'
    },
    {
        '_key': 'Han',
        'firstname': 'Han',
        'lastname':'Solo',
        'email':'han.solo@macrometa.io'
    },
    {
        '_key': 'Bruce',
        'firstname': 'Bruce',
        'lastname':'Wayne',
        'email':'bruce.wayne@macrometa.io'
    }
]

url = FEDERATION_URL + "/_fabric/{}/_api/document/{}".format(GEO_FABRIC,COLLECTION_NAME)
resp = session.post(url,data=json.dumps(payload))
result = json.loads(resp.text)
print(result)

Create Graph

A graph consists of vertices and edges. Vertices are stored as documents in vertex collections and edges stored as documents in edge collections. The collections used in a graph and their relations are specified with edge definitions.

A graph can be created using endpoint POST /_fabric/{fabric_name}/_api/graph as shown below:

payload = {
  "edgeDefinitions": [
    {
      "collection": COLLECTION_NAME,
      "from": [
        "Jean"
      ],
      "to": [
        "James"
      ]
    }
  ],
  "name": GRAPH_NAME
}

url = FEDERATION_URL + "/_fabric/{}/_api/graph".format(GEO_FABRIC)
resp = session.post(url,data=json.dumps(payload))
result = json.loads(resp.text)
print(result)

Graph Traversals

We can retrieve a graph using GET /_fabric/{fabric_name}/_api/edges/{collection_name} endpoint by providing following query parameters:

  • vertex: The id of the start vertex.
  • direction: Selects in or out direction for edges. If not set, any edges are returned.

Below are the step to get graph using Inbound traversal. To get Outbound traversal, use direction=out

params = {
    "vertex": "Jean",
    "direction": "in"
}

url = FEDERATION_URL + "/_fabric/{}/_api/edges/{}".format(GEO_FABRIC,COLLECTION_NAME)

resp = session.get(url,params=params)
result = json.loads(resp.text)
print(result)

Delete Graph

To delete a graph use endpoint DELETE /_fabric/{fabric_name}/_api/graph/{graph_name} as shown in below steps:

params = {"dropCollection": False}

url = FEDERATION_URL + "/_fabric/{}/_api/graph/{}".format(GEO_FABRIC,GRAPH_NAME)

resp = session.get(url,params=params)
result = json.loads(resp.text)
print(result)

Pub-Sub with Streams

GDN streams is a high-performance solution for server-to-server messaging. It provides,

  • Seamless geo-replication of messages across regions,
  • Very low publish and end-to-end latency,
  • Seamless scalability to over a million topics.
  • Multiple subscription modes (exclusive, shared, and failover) for streams.
  • Guaranteed message delivery with persistent message storage.

Streams are built on the publish-subscribe pattern, aka pub-sub. In this pattern, producers publish messages to streams. Consumers can then subscribe to those streams, process incoming messages, and send an acknowledgement when processing is complete.

Once a subscription has been created, all messages will be retained by Streams, even if the consumer gets disconnected Retained messages will be discarded only when a consumer acknowledges that they've been successfully processed.

Messages are the basic "unit" of Streams. They're what producers publish to streams and what consumers then consume from streams (and acknowledge when the message has been processed). Messages are the analogue of letters in a postal service system.

Component Purpose
Value / data payload The data carried by the message. All messages carry raw bytes.
Key Messages can optionally be tagged with keys, which can be useful for things like streams compaction
Properties An optional key/value map of user-defined properties
Producer Name The name of the producer that produced the message (producers are automatically given default names, but you can apply your own explicitly as well)
Sequence ID Each message belongs to an ordered sequence on its stream. A message's sequence ID is its ordering in that sequence.
Publish Time The timestamp of when the message was published (automatically applied by the producer)
Event Time An optional timestamp that applications can attach to the message representing when something happened, e.g. when the message was processed. The event time of a message is 0 if none is explicitly set.

Constants

Let's define following constants which will be used throughout the tutorial.

FEDERATION_URL = "https://gdn1.macrometa.io"

TENANT_NAME = "guest@macrometa.io"
TENANT_USER = "root"
TENANT_PWD = "guest"
GEO_FABRIC =  "_system"
SUBSCRIPTION_NAME = "my-sub"

Create a HTTPs session

session = requests.Session()
session.auth = (TENANT_NAME, TENANT_USER, TENANT_PWD)

Get list of all Regions

url = FEDERATION_URL + "/_database/_api/datacenter/all"
dcl_resp = session.get(url)
dcl_list = json.loads(dcl_resp.text)
regions = []
for dcl in dcl_list:
    dcl_url = dcl['tags']['url']
    regions.append(dcl_url)

Create Geo-Replicated Stream

Create a persistent geo-replicated stream.

url = FEDARATION_URL + "/_tenant/{}/_db/{}/streams/persistent/stream/{}" \.format(TENANT_NAME, GEO_FABRIC, "stream1")
user_session = requests.Session()
user_session.auth = (GUEST_USER, GUEST_PWD)
stream = user_session.post(url)
PythonCopy
Publish & Subscribe messages to the stream
Publish messages to the above created stream.

stream_url = 'wss://' + url + '/c8/_ws/ws/v2/producer/' + "persistent" + '/' + TENANT_NAME + '/' + FEDERATION_NAME \+ '.' + GEOFABRIC_NAME + '/' + "stream1"
ws = websocket.create_connection(stream_url)
ws.send(json.dumps({
    'payload': base64.b64encode('Hello World'),
    'properties': {
        'key1': 'value1',
        'key2': 'value2'
    },
    'context': 5
}))
response = json.loads(ws.recv())
if response['result'] == 'ok':
    print('Message published successfully')
else:
    print('Failed to publish message:', response)
ws.close()
PythonCopy
Subscribe to stream and print the messages.

stream_url = 'wss://' + url + '/c8/_ws/ws/v2/producer/' + "persistent" + '/' + TENANT_NAME + '/' + FEDERATION_NAME \ + '.' + GEOFABRIC_NAME + '/' + "stream1" + SUBSCRIPTION_NAME

ws = websocket.create_connection(stream_url)
while True:
    msg = json.loads(ws.recv())
    if not msg: break

    print("received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])))
    # Acknowledge successful processing
    ws.send(json.dumps({'messageId': msg['messageId']}))

ws.close()

Summary

Below is the complete program if you want to play around with it.

import base64
import json
import requests
import websocket

# Constants defined which are used throughout the tutorial.
FEDERATION_URL = "https://gdn1.macrometa.io"

TENANT_NAME = "guest@macrometa.io"
TENANT_USER = "root"
TENANT_PWD = "guest"
GEO_FABRIC = `_system`
SUBSCRIPTION_NAME = "my-sub"

# Create a https session
session = requests.Session()
session.auth = (TENANT_NAME, TENANT_USER, TENANT_PWD)

# Create persistent global stream
url = FABRIC_URL + "/_tenant/{}/_db/{}/streams/persistent/stream/{}" \.format(TENANT_NAME, DB_NAME, "stream1")
user_session = requests.Session()
user_session.auth = (GUEST_USER, GUEST_PWD)
persistent_stream = user_session.post(url)

# Publish messages to a stream
stream_url = 'wss://' + url + '/c8/_ws/ws/v2/producer/' + "persistent" + '/' + TENANT_NAME + '/' + FABRIC_NAME \+ '.' + GEO_FABRIC + '/' + "stream1"
ws = websocket.create_connection(stream_url)
ws.send(json.dumps({
    'payload': base64.b64encode('Hello World'),
    'properties': {
        'key1': 'value1',
        'key2': 'value2'
    },
    'context': 5
}))
response = json.loads(ws.recv())
if response['result'] == 'ok':
    print('Message published successfully')
else:
    print('Failed to publish message:', response)
ws.close()

# Subscribe to a stream
stream_url = 'wss://' + url + '/c8/_ws/ws/v2/consumer/' + "persistent" + '/' + TENANT_NAME + '/' + FABRIC_NAME \ + '.' + DB_NAME + '/' + "stream1" + SUBSCRIPTION_NAME
ws = websocket.create_connection(stream_url)
while True:
    msg = json.loads(ws.recv())
    if not msg: break
    print("received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])))
    # Acknowledge successful processing
    ws.send(json.dumps({'messageId': msg['messageId']}))

ws.close()

Stream Processing

TBD.

Top