Member since
11-16-2021
3
Posts
4
Kudos Received
0
Solutions
02-27-2023
11:16 PM
1 Kudo
Cloudera CSP CE Plus Apache Pulsar
See: pulsar-csp-ce
Source: https://github.com/tspannhw/pulsar-csp-ce
Integration
https://community.cloudera.com/t5/Cloudera-Stream-Processing-Forum/Using-Apache-Pulsar-with-SQL-Stream-Builder/m-p/349917
Once running in Docker
http://localhost:18121/ui/login
Login
admin/admin
SQL Testing
CREATE TABLE pulsar_test (
`city` STRING
) WITH (
'connector' = 'pulsar',
'topic' = 'topic82547611',
'value.format' = 'raw',
'service-url' = 'pulsar://Timothys-MBP:6650',
'admin-url' = 'http://Timothys-MBP:8080',
'scan.startup.mode' = 'earliest',
'generic' = 'true'
);
CREATE TABLE `pulsar_table_1670269295` (
`col_str` STRING,
`col_int` INT,
`col_ts` TIMESTAMP(3),
WATERMARK FOR `col_ts` AS col_ts - INTERVAL '5' SECOND
) WITH (
'format' = 'json' -- Data format
-- 'json.encode.decimal-as-plain-number' = 'false' -- Optional flag to specify whether to encode all decimals as plain numbers instead of possible scientific notations, false by default.
-- 'json.fail-on-missing-field' = 'false' -- Optional flag to specify whether to fail if a field is missing or not, false by default.
-- 'json.ignore-parse-errors' = 'false' -- Optional flag to skip fields and rows with parse errors instead of failing; fields are set to null in case of errors, false by default.
-- 'json.map-null-key.literal' = 'null' -- Optional flag to specify string literal for null keys when 'map-null-key.mode' is LITERAL, \"null\" by default.
-- 'json.map-null-key.mode' = 'FAIL' -- Optional flag to control the handling mode when serializing null key for map data, FAIL by default. Option DROP will drop null key entries for map data. Option LITERAL will use 'map-null-key.literal' as key literal.
-- 'json.timestamp-format.standard' = 'SQL' -- Optional flag to specify timestamp format, SQL by default. Option ISO-8601 will parse input timestamp in \"yyyy-MM-ddTHH:mm:ss.s{precision}\" format and output timestamp in the same format. Option SQL will parse input timestamp in \"yyyy-MM-dd HH:mm:ss.s{precision}\" format and output timestamp in the same format.
);
CREATE TABLE pulsar_citibikenyc (
`num_docks_disabled` DOUBLE,
`eightd_has_available_keys` STRING,
`station_status` STRING,
`last_reported` DOUBLE,
`is_installed` DOUBLE,
`num_ebikes_available` DOUBLE,
`num_bikes_available` DOUBLE,
`station_id` DOUBLE,
`is_renting` DOUBLE,
`is_returning` DOUBLE,
`num_docks_available` DOUBLE,
`num_bikes_disabled` DOUBLE,
`legacy_id` DOUBLE,
`valet` STRING,
`eightd_active_station_services` STRING,
`ts` DOUBLE,
`uuid` STRING
) WITH (
'connector' = 'pulsar',
'topic' = 'persistent://public/default/citibikenyc',
'value.format' = 'json',
'service-url' = 'pulsar://Timothys-MBP:6650',
'admin-url' = 'http://Timothys-MBP:8080',
'scan.startup.mode' = 'earliest',
'generic' = 'true'
);
CREATE TABLE pulsar_thermalsensors (
`uuid` STRING NOT NULL,
`ipaddress` STRING NOT NULL,
`cputempf` INT NOT NULL,
`runtime` INT NOT NULL,
`host` STRING NOT NULL,
`hostname` STRING NOT NULL,
`macaddress` STRING NOT NULL,
`endtime` STRING NOT NULL,
`te` STRING NOT NULL,
`cpu` FLOAT NOT NULL,
`diskusage` STRING NOT NULL,
`memory` FLOAT NOT NULL,
`rowid` STRING NOT NULL,
`systemtime` STRING NOT NULL,
`ts` INT NOT NULL,
`starttime` STRING NOT NULL,
`datetimestamp` STRING NOT NULL,
`temperature` FLOAT NOT NULL,
`humidity` FLOAT NOT NULL,
`co2` FLOAT NOT NULL,
`totalvocppb` FLOAT NOT NULL,
`equivalentco2ppm` FLOAT NOT NULL,
`pressure` FLOAT NOT NULL,
`temperatureicp` FLOAT NOT NULL
) WITH (
'connector' = 'pulsar',
'topic' = 'persistent://public/default/thermalsensors',
'value.format' = 'json',
'service-url' = 'pulsar://Timothys-MBP:6650',
'admin-url' = 'http://Timothys-MBP:8080',
'scan.startup.mode' = 'earliest',
'generic' = 'true'
)
CREATE TABLE fake_data (
city STRING )
WITH (
'connector' = 'faker',
'rows-per-second' = '1',
'fields.city.expression' = '#{Address.city}'
);
insert into pulsar_test
select * from fake_data;
select last_reported, num_bikes_available, station_id, num_docks_available, ts
from
pulsar_citibikenyc;
select `systemtime`, `cputempf`, `cpu`, `humidity`, `co2`, `temperature`, `totalvocppb`, `equivalentco2ppm`, `pressure`, `temperatureicp`
from pulsar_thermalsensors
Create a Materialized View
http://localhost:18131/api/v1/query/5202/thermal?key=c674a39b-921a-4759-a2fb-e599366cfe51
/api/v1/query/5202/thermal?key=c674a39b-921a-4759-a2fb-e599366cfe51
Running SQL Stream Builder (Flink SQL) against Pulsar
Containers Running Cloudera CSP CE in Docker
References
https://github.com/tspannhw/create-nifi-pulsar-flink-apps
https://github.com/streamnative/flink-example/blob/main/docker-compose.yml
https://docs.cloudera.com/csp-ce/latest/index.html
https://github.com/tspannhw/SpeakerProfile
... View more
Labels:
03-07-2022
08:28 PM
3 Kudos
Cloudera has added a StreamNative community-supported processor.
Reading OpenSea NFT with Apache NiFi, Apache Pulsar, and Friends. Meetup. StreamNative
Upcoming Meetup Demo
Apache Pulsar and Apache NiFi for Cloud Data Lakes
Pulsar Build
bin/pulsar-admin topics delete persistent://public/default/nft
bin/pulsar-admin topics delete persistent://public/default/crypto
bin/pulsar-admin topics create persistent://public/default/nft
bin/pulsar-admin topics create persistent://public/default/crypto
bin/pulsar-client consume "persistent://public/default/crypto" -s "nftreadercrypto" -n 0
bin/pulsar-client consume "persistent://public/default/nft" -s "nftyreader" -n 0
bin/pulsar-admin schemas get "persistent://public/default/nft"
Apache NiFi Flows
NiFi -> Pulsar Flow
Example NFT JSON Record
{
"date" : "Thu, 24 Feb 2022 22:26:41 GMT",
"short_description" : "",
"featured" : "false",
"image_thumbnail_url" : "https://lh3.googleusercontent.com/-dsYkPkvAQW6KvP0Nnbjecb4Pqo1RsLvTlKKDdSPiUR4LFGqTp3mvf2uOkLtF5p1c1eLIZi6S6WhqvOX4dY-MPhCy8zg8E5_PIUTrA=s128",
"asset_contract_created_date" : "2022-02-21T01:50:07.184731",
"asset_contract_owner" : "240677413",
"image_preview_url" : "https://lh3.googleusercontent.com/-dsYkPkvAQW6KvP0Nnbjecb4Pqo1RsLvTlKKDdSPiUR4LFGqTp3mvf2uOkLtF5p1c1eLIZi6S6WhqvOX4dY-MPhCy8zg8E5_PIUTrA=s250",
"asset_contract_symbol" : "Bunny Buddies",
"twitter_username" : "",
"description" : "The Bunny Buddies are 8888 NFTs created by Ryan Robinson. With this collection, the 3D artist explores duality and looks for the right balance between darkness and light.",
"asset_contract_address" : "0x91cc3844b8271337679f8c00cb2d238886917d40",
"external_url" : "https://bunny-buddies.com/",
"token_id" : "4640",
"asset_contract_name" : "Bunny Buddies",
"asset_contract_nft_version" : "3.0",
"asset_contract_description" : "The Bunny Buddies are a collection of 8888 collectibles designed by Ryan Robinson aka Rhabbitz (Yobritish). These bunnies with crazy aesthetics and strong personalities are on their way to take over the metaverse. \nTo know more about the project, visit our website : https://bunny-buddies.com",
"asset_contract_external_link" : "https://bunny-buddies.com/",
"id" : "304361312",
"featured_image_url" : "https://lh3.googleusercontent.com/xE_aoM0UA6n1-Y5_eaI5YOq3EHJA8S7AKsQrGSminMv0kuWcg2ifdz0riEdpiusV6ZPWXEE_HY1JL7QksrpLlVpORVasp5HewjgBvQ=s300",
"slug" : "bunny-buddies",
"token_metadata" : "https://bunnybuddies.mypinata.cloud/ipfs/QmRRavBvkQLEd1F64QxtQoeUzPvUZFJo6dhmtT5wWbvnQR/4640",
"asset_contract_schema_name" : "ERC721",
"animation_url" : "https://storage.opensea.io/files/482cae8fbabbff6c1d208426ed8ad961.mp4",
"num_sales" : "0",
"image_url" : "https://lh3.googleusercontent.com/wqWbyWmm2jrGHmuCoudc96Cikocao2L4XE1NxHSdl87I9rWC_wvA2l0ubxtQqDBpeSibZbrOuJiWK0dNTOIMDaBRIWvIkoDraMb8PWM=s120",
"asset_contract_default_to_fiat" : "false",
"external_link" : "",
"image_original_url" : "https://bunnybuddies.mypinata.cloud/ipfs/QmZ8xSD7Dt48FYinQ2SKK95q2boJk7QZHsWDCKfTXUYm3J/reveal.mp4",
"asset_contract_payout_address" : "0xfde43ebd4f75960cdac70971b731e0bab144c8f2",
"animation_original_url" : "https://bunnybuddies.mypinata.cloud/ipfs/QmZ8xSD7Dt48FYinQ2SKK95q2boJk7QZHsWDCKfTXUYm3J/reveal.mp4",
"background_color" : "",
"asset_contract_asset_contract_type" : "non-fungible",
"name" : "#4641",
"asset_contract_image_url" : "https://lh3.googleusercontent.com/wqWbyWmm2jrGHmuCoudc96Cikocao2L4XE1NxHSdl87I9rWC_wvA2l0ubxtQqDBpeSibZbrOuJiWK0dNTOIMDaBRIWvIkoDraMb8PWM=s120",
"asset_contract_total_supply" : "0"
}
StreamNative Cloud
Resources
https://github.com/tspannhw/FLiPN-Demos
Producing and Consuming Pulsar messages with Apache NiFi
https://github.com/streamnative/pulsar-nifi-bundle
Pulsar in Python on Pi for Sensors
https://www.linkedin.com/newsletters/flip-stack-weekly-6861715928728576000/
https://github.com/tspannhw/SpeakerProfile/tree/main/2022/talks
https://www.slideshare.net/bunkertor/devfest-uk-ireland-using-apache-nifi-with-apache-pulsar-for-fast-data-onramp-2022
https://www.slideshare.net/bunkertor/data-science-online-camp-using-the-flipn-stack-for-edge-ai-flink-nifi-pulsar
https://www.slideshare.net/bunkertor/api-world-apache-nifi-101
https://www.slideshare.net/bunkertor/ai-dev-world-utilizing-apache-pulsar-apache-ni-fi-and-minifi-for-edgeai-iot-at-scale
https://github.com/tspannhw/EverythingApacheNiFi
https://www.slideshare.net/bunkertor/apachecon-2021-apache-nifi-101-introduction-and-best-practices
https://www.pulsardeveloper.com/
https://arcade.sqlbits.com/session-details/?id=298240
Resources
https://github.com/tspannhw/awesome-nifi-pulsar/blob/main/README.md
Cloudera What's New in Flow Management
Cloudera CDF Datahub Supported Partner Components
https://github.com/streamnative/pulsar-nifi-bundle
https://github.com/tspannhw/FLiPN-NFT
https://www.datainmotion.dev/2021/11/producing-and-consuming-pulsar-messages.html
... View more