Global Pub-Sub with Streams

Overview

Globally distributed applications need a geo distributed fast data platform that can transparently replicate the data anywhere in the world to enable the applications to operate on a copy of the data that's close to its users. Similarly the applications need geo-replicated and local streams to handle pub-sub, ETL and real-time updates from the fast data platform.

Macrometa GDN is a fully managed geo-distributed data service with turnkey global distribution and transparent multi-master replication. You can run globally distributed, low-latency workloads within GDN. This article is an introduction to using GDN with pyC8 and jsC8 drivers.

GDN streams is a high-performance solution for server-to-server messaging. It provides,

  • Seamless geo-replication of messages across regions,
  • Very low publish and end-to-end latency,
  • Seamless scalability to over a million topics.
  • Multiple subscription modes (exclusive, shared, and failover) for streams.
  • Guaranteed message delivery with persistent message storage.

Streams are built on the publish-subscribe pattern, aka pub-sub. In this pattern, producers publish messages to streams. Consumers can then subscribe to those streams, process incoming messages, and send an acknowledgement when processing is complete.

Once a subscription has been created, all messages will be retained by Streams, even if the consumer gets disconnected Retained messages will be discarded only when a consumer acknowledges that they've been successfully processed.

Messages are the basic "unit" of Streams. They're what producers publish to streams and what consumers then consume from streams (and acknowledge when the message has been processed). Messages are the analogue of letters in a postal service system.

Component Purpose
Value / data payload The data carried by the message. All messages carry raw bytes.
Key Messages can optionally be tagged with keys, which can be useful for things like streams compaction
Properties An optional key/value map of user-defined properties
Producer Name The name of the producer that produced the message (producers are automatically given default names, but you can apply your own explicitly as well)
Sequence ID Each message belongs to an ordered sequence on its stream. A message's sequence ID is its ordering in that sequence.
Publish Time The timestamp of when the message was published (automatically applied by the producer)
Event Time An optional timestamp that applications can attach to the message representing when something happened, e.g. when the message was processed. The event time of a message is 0 if none is explicitly set.

Note

If you are new to Macrometa GDN, we strongly recommend reading Essentials of Macrometa GDN.

Pre-requisite

Let's assume your

  • tenant name is guest@macrometa.io and
  • user password is guest.

Driver download


pyC8 requires Python 3.5+. Python 3.6 or higher is recommended

To install pyC8, simply run

    $ pip3 install pyC8

or, if you prefer to use conda:

    conda install -c conda-forge pyC8

or pipenv:

    pipenv install --pre pyC8

Once the installation process is finished, you can begin developing applications in Python.

With Yarn or NPM

    yarn add jsc8
    (or)
    npm install jsc8

If you want to use the driver outside of the current directory, you can also install it globally using the `--global` flag:

    npm install --global jsc8

From source,

    git clone https://github.com/macrometacorp/jsc8.git
    cd jsC8
    npm install
    npm run dist

Connect to GDN

The first step in using GDN is to establish a connection to a local region. When this code executes, it initializes the server connection to the region URL you sepcified.

from c8 import C8Client

print("Connect to C8...")
client = C8Client(protocol='https', host='gdn1.macrometa.io', port=443)
const jsc8 = require("jsc8")
const client = new jsc8("https://gdn1.macrometa.io"); 

Get GeoFabric Details

To get details of fabric,

from c8 import C8Client
client = C8Client(protocol='https', host='MY-C8-URL', port=443)

demotenant = client.tenant(email='guest@macrometa.io', password='guest')
print("Get geo fabric...")
fabric = demotenant.useFabric('_system')
print("Get geo fabric details...")
print(fabric.fabrics_detail())
const jsc8 = require("jsc8")

const client = new jsc8("https://gdn1.macrometa.io");

async function getFabric() {
    await console.log("Logging in...");
    await client.login("guest@macrometa.io", "guest");
    await console.log("Using the tenant...");  
    client.useTenant("guest");

    try{
      await console.log("Using the Fabric...");  
      client.useFabric("_system")

      await console.log("Getting the fabric details...");
      let result = await client.get();

      await console.log("result is: ", result)
    } catch(e){
      await console.log("Fabric details could not be fetched because "+ e)
    }
}

getFabric();

Create Global & Local Streams

The streams in GDN can be either a local stream or could be a geo-replicated stream.


from c8 import C8Client

demo_stream = 'demostream'  #Name of the Stream
print("Create connection...")
client = C8Client(protocol='https', host='gdn1.macrometa.io', port=443)

print("Get geo fabric...")
demotenant = client.tenant(email='guest@macrometa.io', password='guest')
print("Get geo fabric...")
fabric = demotenant.useFabric('_system')

print("Create geo-replicated & local streams in demofabric...")
fabric.create_stream(demo_stream, local=False)
fabric.create_stream(demo_stream, local=True)

print("Get streams...")
streams = fabric.streams()
print("streams:", streams)
const jsc8 = require("jsc8")

const client = new jsc8("https://gdn1.macrometa.io");

async function streams() {
    await console.log("Logging in...");
    await client.login("guest@macrometa.io", "guest");
    await console.log("Using the tenant...");  
    client.useTenant("guest");

    try{
      await console.log("Using the Fabric...");  
      client.useFabric("_system")

      await console.log("Creating local stream...");
      const stream_local = client.stream("testStream-local", local=true, false);

      await console.log("Creating global stream...");
      const stream_global= client.stream("testStream-global", local=false, false);

    } catch(e){
      await console.log("Streams could not be fetched because "+ e)
    }
}

streams();

Publish Messages

Example to publish documents to a stream. The stream can be either a local stream or could be a geo-replicated stream.

from c8 import C8Client
import time
import base64
import six
import json
import warnings
warnings.filterwarnings("ignore")

region = "gdn1.macrometa.io"
demo_tenant = "guest@macrometa.io"
demo_fabric = "_system"
demo_user = "demouser@macrometa.io"
demo_user_name = "demouser"
#--------------------------------------------------------------
print("publish messages to stream...")
client = C8Client(protocol='https', host=region, port=443)
demotenant = client.tenant(email=demo_tenant, password='demo')
fabric = demotenant.useFabric(demo_fabric)
stream = fabric.stream()
producer = stream.create_producer("demostream", local=False)
for i in range(10):
    msg = "Hello from " + region + "("+ str(i) +")"
    payload = {
        "payload": base64.b64encode(
            six.b(msg)
            ).decode("utf-8")
    }
    producer.send(json.dumps(payload))
    time.sleep(10) # 10 sec

const jsc8 = require("jsc8")

const client = new jsc8("https://gdn1.macrometa.io");

async function streams() {
    await console.log("Logging in...");
    await client.login("guest@macrometa.io", "guest");
    await console.log("Using the tenant...");  
    client.useTenant("guest");

    try{
      await console.log("Using the Fabric...");  
      client.useFabric("_system")

      await console.log("Creating local stream...");
      let stream_local= client.stream("testStream-local", true);

      await console.log("Creating global stream...");
      let stream_global = client.stream("testStream-global", false);

      for (let i = 0; i < 11; i++) { 
        let msg = "Hello from  user-->" +  "("+ (i).toString() +")"
        await stream_local.producer(msg, "gdn1.macrometa.io")
        await stream_global.producer(msg, "gdn1.macrometa.io")


    } 
    } catch(e){
      await console.log("Publishing could not be done because "+ e)
    }
}

streams()

Subscribe to Stream

Example to subscribe documents from a stream. The stream can be either a local stream or could be a geo-replicated stream.


from c8 import C8Client
import json
import base64
import warnings
warnings.filterwarnings("ignore")

region = "gdn1.macrometa.io"
demo_tenant = "guest@macrometa.io"
demo_fabric = "_system"
#--------------------------------------------------------------
print("consume messages from stream...")
client = C8Client(protocol='https', host=region, port=443)
demotenant = client.tenant(email=demo_tenant, password='guest')
fabric = demotenant.useFabric(demo_fabric)
stream = fabric.stream()
#you can subscribe using consumer_types option.
subscriber = stream.subscribe("demostream", local=False, subscription_name="demosub", consumer_type= stream.CONSUMER_TYPES.EXCLUSIVE)
for i in range(10):
    msg = json.dumps(subscriber.recv())
    string_msg = base64.b64decode(msg['payload'])
    msgId = msg['messageId']
    print("Received message '{}' id='{}'".format(string_msg, msgId))
const jsc8 = require('jsc8')

const fed_url = "https://gdn1.macrometa.io"
const guest_email = "guest@macrometa.io"
const guest_password = "guest"
const geo_fabric = "_system"
const client = new jsc8(fed_url)

const msgs = ["message 1", "message 2", "message 3"]
let numberOfMessages = 0

async function setup() {
  await console.log("Logging in...");
  await client.login(guest_email, guest_password);
  await console.log("Using the fabric: " + geo_fabric);
  client.useFabric(geo_fabric);
}

async function getDCList() {
  let dcListAll = await client.listUserFabrics()
  let dcListObject = await dcListAll.find(function(o) { return o.name === geo_fabric; });
  return dcListObject.options.dcList.split(",")
}

async function receive(stream) {
  return new Promise(resolve =>
    stream.consumer("my-sub", {
      onmessage: (msg) => {
        const parsedMsg = JSON.parse(msg);
        const { payload } = parsedMsg;


        if (msgs.includes(Buffer.from(payload, 'base64').toString())) {
          numberOfMessages++;
        };

        if (numberOfMessages === msgs.length) {
          stream.closeConnections();
        }
      },
      onopen: () => {
        console.log('consumer connection is open');
        resolve();
      }
    }, "gdn1.macrometa.io")
  );
}

(async function() {
  await setup();
  const dcList = await getDCList()
  await console.log("dcList: ", dcList)
  const stream = client.stream("testStream", false);
  //Here the last boolean value tells if the stream is local or global. false means that it is global.
  await stream.createStream();
  // publishing streams
  await receive(stream)

})();
Top