Cloudera Stream Processing Forum

Using Apache Pulsar with SQL Stream Builder

Cloudera Employee

You can easily get started using SQL Stream Builder powered by Apache Flink with Apache Pulsar using the Cloudera Stream Processing Community Edition (CSP CE) and the Stream Native Pulsar SQL Connector. In this tutorial we will go over the steps needed to integrate these two together. 


Make sure you have a working CSP CE deployment by following the documentation. 


Build the Stream Native Pulsar Connector for Apache Flink


Clone the repository to your machine and checkout the version for Flink 1.14



git clone
git checkout tags/release-




Build the connector repo using maven



mvn clean install -DskipTests





Open SQL Stream Builder (SSB) and add the Connector

  • Click Connectors on the Left Menu
  • Click Register Connector in the top right corner

connectors menu.png


  • Name the connector “pulsar”
  • Select CSV, JSON, AVRO, and RAW
  • Add the pulsar-flink-sql-connector_2.11- file
    1. Depending on when you build this the version and name could be slightly different
    2. This is found in the github cloned folder under /pulsar-flink/pulsar-flink-sql-connector/target
    3. You could also create this with properties that are required but in the case of this tutorial we will skip that part


  • Restart the SSB Container to add the new jar to the class path



docker restart $(docker ps -a --format '{{.Names}}' --filter "name=ssb-sse.(\d)")




Create a Table that uses Pulsar

To get started we will create just a very simple table that will be able to read RAW data as a single large string. 

  • Click Console on the right
  • Add the DDL to the window
    1. You will need to update the topic, admin-url, and service-url
      1. In the case of the admin-url and service-url PULSAR_HOST should be the IP Address of the location of the Pulsar instance/container 
    2. It's also very important to make sure that 'generic' = 'true' is part of the properties to ensure that SSB provides the correct table/topic names when executing a query. For more information see this ticket - 
  • Click Execute




CREATE TABLE pulsar_test (
  `city` STRING
) WITH (
  'connector' = 'pulsar',
  'topic' = 'topic82547611',
  'value.format' = 'raw',
  'service-url' = 'pulsar://PULSAR_HOST:6650',
  'admin-url' = 'http://PULSAR_HOST:8080',
  'scan.startup.mode' = 'earliest',
  'generic' = 'true'






Start a Pulsar Container

Check out the latest Pulsar Documentation for more details on running a standalone pulsar container


Run the docker command to get a standalone Pulsar deployment



docker run -it -p 6650:6650  -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.10.1 bin/pulsar standalone





Push messages to Pulsar and start a query in SSB

It’s possible to use SSB to write data to Pulsar with an INSERT SQL statement by running the below command in the SSB Console and clicking Execute


Tip - Parts of console code can be executed by just highlighting the code you wish to execute.




INSERT INTO pulsar_test
VALUES ('data 1'),('data 2'),('data 3')




Flink and SSB come with the ability to generate data with Faker.  This table will randomly generate messages that we can use to insert into our pulsar table.




CREATE TABLE  fake_data (
city STRING )
'connector' = 'faker',
'rows-per-second' = '1',
'' = '#{}'




The data that is generated can be inserted into our pulsar table just like a normal SQL query



insert into pulsar_test
select * from fake_data;




Query Data from Pulsar

It can also be queried back out the same way and a sample of results displayed in the UI. 


Tip - When you run a query, only a small sample of the data is shown in the SSB UI by default. To see all records the configuration of the sampler needs to be changed. To have all data returned in the SSB UI console click Job Settings -> Sample Behavior -> Sample All Messages



Select the data from the test table



select * from pulsar_test






PS - It's also possible to publish messages with the Pulsar client and read them back out. To look at how to publish with the Pulsar client see here: