Pub-Sub with Streams

This is about how to create geo-replicated streams and do queues & pub-sub messaging with local latencies across the globe.


Let's assume your

  • Tenant name is and
  • User password is guest.

Driver download

pyC8 requires Python 3.5+. Python 3.6 or higher is recommended

To install pyC8, simply run

    $ pip3 install pyC8

or, if you prefer to use conda:

    conda install -c conda-forge pyC8

or pipenv:

    pipenv install --pre pyC8

Once the installation process is finished, you can begin developing applications in Python.

With Yarn or NPM

    yarn add jsc8
    npm install jsc8

If you want to use the driver outside of the current directory, you can also install it globally using the `--global` flag:

    npm install --global jsc8

From source,

    git clone
    cd jsC8
    npm install
    npm run dist

Code Sample

from c8 import C8Client
import random
import threading
import time
import json
import base64
import six

# Variables
global_url = "" # The request will be automatically routed to closest location.
email = ""
password = "guest1"
geo_fabric = "testfabric"
stream_name = "stream"+ str(random.randint(1, 10000))

def create_subscriber():
    print("\n ------- SUBSCRIBE TOPIC  ------")

    print("Subscribe to stream: {}".format(stream_name))
    subscriber1 = client.subscribe(stream=stream_name, local=True, subscription_name="subscriber1",

    #receive: read the published messages over stream.
    for i in range(10):
        response = json.loads(subscriber1.recv())
        msg = base64.b64decode(response["payload"])
        print("Received Message:", msg)
        if response["messageId"]:
            # print("Acknowledging msg: ", response["messageId"])
                {"payload": base64.b64encode(six.b(

if __name__ == '__main__':

    print("\n ------- CONNECTION SETUP  ------")
    print("user: {}, geofabric:{}".format(email, geo_fabric))
    print("\n1. CONNECT: federation: {},  user: {}".format(global_url, email))
    client = C8Client(protocol='https', host=global_url, port=443,
                     email=email, password=password,

    print("\n ------- CREATE STREAM  (local/geo-replicated) ------")
    client.create_stream(stream_name, local=True)  # set local=False for geo-replicated stream available in all regions.
    print("Created stream: {}".format(stream_name))
    time.sleep(10)  # to account for network latencies in replication

    print("\n ------- CREATE SUBSCRIBER  ------")
    subscriber_thread = threading.Thread(target=create_subscriber)

    print("\n ------- CREATE PRODUCER  ------")
    print("Create producer on stream: {}".format(stream_name))
    producer = client.create_stream_producer(stream_name, local=True)
    print("\n ------- PUBLISH MESSAGES  ------")
    print("Publish 10 messages to stream: {}".format(stream_name))
    for i in range(10):
        msg = "Hello from  user--" + "(" + str(i) + ")"
        data = {
                "payload": base64.b64encode(six.b(msg)).decode("utf-8"),
            response =  json.loads(producer.recv())
            if response['result'] == 'ok':
              print('Message published successfully')
              print('Failed to publish message:', response)
        except Exception as e:
            m = "Producer failed to send message due to Pulsar Error - %s" % e

    print("Publish messages done...")

    print("Wait for subscriber to consume all messages...")
    subscriber_thread.join()  # Wait for subscriber to consume all messages.
    print("\n ------- DONE  ------")
const jsc8 = require('jsc8')

const fed_url = ""
const guest_email = ""
const guest_password = "guest"
const geo_fabric = "testfabric"
const client = new jsc8(fed_url)

const msgs = ["message 1", "message 2", "message 3"]
let numberOfMessages = 0

async function setup() {
  await console.log("Logging in...");
  await client.login(guest_email, guest_password);
  await console.log("Using the fabric: " + geo_fabric);

async function getDCList() {
  let dcListAll = await client.listUserFabrics()
  let dcListObject = await dcListAll.find(function(o) { return === geo_fabric; });
  return dcListObject.options.dcList.split(",")

async function publish(stream) {
  console.log("\n ------- PUBLISH MESSAGES  ------")

  await stream.producer(msgs, "");

async function receive(stream) {
  return new Promise(resolve =>
    stream.consumer("my-sub", {
      onmessage: (msg) => {
        const parsedMsg = JSON.parse(msg);
        const { payload } = parsedMsg;

        if (msgs.includes(Buffer.from(payload, 'base64').toString())) {

        if (numberOfMessages === msgs.length) {
      onopen: () => {
        console.log('consumer connection is open');
    }, "")

(async function() {
  await setup();
  const dcList = await getDCList()
  await console.log("dcList: ", dcList)
  const stream ="testStream", false);
  //Here the last boolean value tells if the stream is local or global. false means that it is global.

  await stream.createStream();

  // publishing streams
  await receive(stream)

  await publish(stream)