Cloudera Stream Processing Forum

Using Apache Pulsar with SQL Stream Builder

Cloudera Employee

You can easily get started using SQL Stream Builder powered by Apache Flink with Apache Pulsar using the Cloudera Stream Processing Community Edition (CSP CE) and the Stream Native Pulsar SQL Connector. In this tutorial we will go over the steps needed to integrate these two together. 

 

Make sure you have a working CSP CE deployment by following the documentation.

https://docs.cloudera.com/csp-ce/latest/index.html 

 

Build the Stream Native Pulsar Connector for Apache Flink

https://github.com/streamnative/pulsar-flink/

 

Clone the repository to your machine and checkout the version for Flink 1.14

 

 

git clone https://github.com/streamnative/pulsar-flink.git
git checkout tags/release-1.14.3.4

 

 

 

Build the connector repo using maven

 

 

mvn clean install -DskipTests

 

 

 

 

Open SQL Stream Builder (SSB) and add the Connector

  • Click Connectors on the Left Menu
  • Click Register Connector in the top right corner

connectors menu.png

 

  • Name the connector “pulsar”
  • Select CSV, JSON, AVRO, and RAW
  • Add the pulsar-flink-sql-connector_2.11-1.14.3.4.jar file
    1. Depending on when you build this the version and name could be slightly different
    2. This is found in the github cloned folder under /pulsar-flink/pulsar-flink-sql-connector/target
    3. You could also create this with properties that are required but in the case of this tutorial we will skip that part

pulsarconnector.png

  • Restart the SSB Container to add the new jar to the class path

 

 

docker restart $(docker ps -a --format '{{.Names}}' --filter "name=ssb-sse.(\d)")

 

 

 

Create a Table that uses Pulsar

To get started we will create just a very simple table that will be able to read RAW data as a single large string. 

  • Click Console on the right
  • Add the DDL to the window
    1. You will need to update the topic, admin-url, and service-url
      1. In the case of the admin-url and service-url PULSAR_HOST should be the IP Address of the location of the Pulsar instance/container 
    2. It's also very important to make sure that 'generic' = 'true' is part of the properties to ensure that SSB provides the correct table/topic names when executing a query. For more information see this ticket - https://github.com/streamnative/pulsar-flink/issues/528 
  • Click Execute

 

 

 

CREATE TABLE pulsar_test (
  `city` STRING
) WITH (
  'connector' = 'pulsar',
  'topic' = 'topic82547611',
  'value.format' = 'raw',
  'service-url' = 'pulsar://PULSAR_HOST:6650',
  'admin-url' = 'http://PULSAR_HOST:8080',
  'scan.startup.mode' = 'earliest',
  'generic' = 'true'
);

 

 

 

console-ddl.png

 

Start a Pulsar Container

Check out the latest Pulsar Documentation for more details on running a standalone pulsar container https://pulsar.apache.org/docs/getting-started-docker/

 

Run the docker command to get a standalone Pulsar deployment

 

 

docker run -it -p 6650:6650  -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.10.1 bin/pulsar standalone

 

 

 

 

Push messages to Pulsar and start a query in SSB

It’s possible to use SSB to write data to Pulsar with an INSERT SQL statement by running the below command in the SSB Console and clicking Execute

 

Tip - Parts of console code can be executed by just highlighting the code you wish to execute.

 

 

 

INSERT INTO pulsar_test
VALUES ('data 1'),('data 2'),('data 3')

 

 

 

Flink and SSB come with the ability to generate data with Faker.  This table will randomly generate messages that we can use to insert into our pulsar table.

 

 

 

CREATE TABLE  fake_data (
city STRING )
WITH (
'connector' = 'faker',
'rows-per-second' = '1',
'fields.city.expression' = '#{Address.city}'
);

 

 

 

The data that is generated can be inserted into our pulsar table just like a normal SQL query

 

 

insert into pulsar_test
select * from fake_data;

 

 

 

Query Data from Pulsar

It can also be queried back out the same way and a sample of results displayed in the UI. 

 

Tip - When you run a query, only a small sample of the data is shown in the SSB UI by default. To see all records the configuration of the sampler needs to be changed. To have all data returned in the SSB UI console click Job Settings -> Sample Behavior -> Sample All Messages

sampler.png

 

Select the data from the test table

 

 

select * from pulsar_test

 

 

 

citiesqueriy-ssb.png

 

PS - It's also possible to publish messages with the Pulsar client and read them back out. To look at how to publish with the Pulsar client see here: https://pulsar.apache.org/docs/getting-started-docker/#produce-a-message 

0 REPLIES 0