Community Articles

Find and share helpful community-sourced technical articles.
Labels (2)
avatar
New Contributor

Cloudera CSP CE Plus Apache Pulsar

See:  pulsar-csp-ce

Source:   https://github.com/tspannhw/pulsar-csp-ce

Integration

Once running in Docker

http://localhost:18121/ui/login

Login

admin/admin

SQL Testing

CREATE TABLE pulsar_test (
  `city` STRING
) WITH (
  'connector' = 'pulsar',
  'topic' = 'topic82547611',
  'value.format' = 'raw',
  'service-url' = 'pulsar://Timothys-MBP:6650',
  'admin-url' = 'http://Timothys-MBP:8080',
  'scan.startup.mode' = 'earliest',
  'generic' = 'true'
);

CREATE TABLE  `pulsar_table_1670269295` (
  `col_str` STRING,
  `col_int` INT,
  `col_ts` TIMESTAMP(3),
   WATERMARK FOR `col_ts` AS col_ts - INTERVAL '5' SECOND
) WITH (
  'format' = 'json' -- Data format
  -- 'json.encode.decimal-as-plain-number' = 'false' -- Optional flag to specify whether to encode all decimals as plain numbers instead of possible scientific notations, false by default.
  -- 'json.fail-on-missing-field' = 'false' -- Optional flag to specify whether to fail if a field is missing or not, false by default.
  -- 'json.ignore-parse-errors' = 'false' -- Optional flag to skip fields and rows with parse errors instead of failing; fields are set to null in case of errors, false by default.
  -- 'json.map-null-key.literal' = 'null' -- Optional flag to specify string literal for null keys when 'map-null-key.mode' is LITERAL, \"null\" by default.
  -- 'json.map-null-key.mode' = 'FAIL' -- Optional flag to control the handling mode when serializing null key for map data, FAIL by default. Option DROP will drop null key entries for map data. Option LITERAL will use 'map-null-key.literal' as key literal.
  -- 'json.timestamp-format.standard' = 'SQL' -- Optional flag to specify timestamp format, SQL by default. Option ISO-8601 will parse input timestamp in \"yyyy-MM-ddTHH:mm:ss.s{precision}\" format and output timestamp in the same format. Option SQL will parse input timestamp in \"yyyy-MM-dd HH:mm:ss.s{precision}\" format and output timestamp in the same format.
);


CREATE TABLE pulsar_citibikenyc (
	`num_docks_disabled` DOUBLE,
	`eightd_has_available_keys` STRING,
	`station_status` STRING,
	`last_reported` DOUBLE,
	`is_installed` DOUBLE,
	`num_ebikes_available` DOUBLE,
	`num_bikes_available` DOUBLE,
	`station_id` DOUBLE,
	`is_renting` DOUBLE,
	`is_returning` DOUBLE,
	`num_docks_available` DOUBLE,
	`num_bikes_disabled` DOUBLE,
	`legacy_id` DOUBLE,
	`valet` STRING,
	`eightd_active_station_services` STRING,
	`ts` DOUBLE,
	`uuid` STRING
) WITH (
  'connector' = 'pulsar',
  'topic' = 'persistent://public/default/citibikenyc',
  'value.format' = 'json',
  'service-url' = 'pulsar://Timothys-MBP:6650',
  'admin-url' = 'http://Timothys-MBP:8080',
  'scan.startup.mode' = 'earliest',
  'generic' = 'true'
);

CREATE TABLE pulsar_thermalsensors (
  `uuid` STRING NOT NULL,
  `ipaddress` STRING NOT NULL,
  `cputempf` INT NOT NULL,
  `runtime` INT NOT NULL,
  `host` STRING NOT NULL,
  `hostname` STRING NOT NULL,
  `macaddress` STRING NOT NULL,
  `endtime` STRING NOT NULL,
  `te` STRING NOT NULL,
  `cpu` FLOAT NOT NULL,
  `diskusage` STRING NOT NULL,
  `memory` FLOAT NOT NULL,
  `rowid` STRING NOT NULL,
  `systemtime` STRING NOT NULL,
  `ts` INT NOT NULL,
  `starttime` STRING NOT NULL,
  `datetimestamp` STRING NOT NULL,
  `temperature` FLOAT NOT NULL,
  `humidity` FLOAT NOT NULL,
  `co2` FLOAT NOT NULL,
  `totalvocppb` FLOAT NOT NULL,
  `equivalentco2ppm` FLOAT NOT NULL,
  `pressure` FLOAT NOT NULL,
  `temperatureicp` FLOAT NOT NULL
) WITH (

  'connector' = 'pulsar',
  'topic' = 'persistent://public/default/thermalsensors',
  'value.format' = 'json',
  'service-url' = 'pulsar://Timothys-MBP:6650',
  'admin-url' = 'http://Timothys-MBP:8080',
  'scan.startup.mode' = 'earliest',
  'generic' = 'true'
)


CREATE TABLE  fake_data (
city STRING )
WITH (
'connector' = 'faker',
'rows-per-second' = '1',
'fields.city.expression' = '#{Address.city}'
);

insert into pulsar_test
select * from fake_data;


select last_reported, num_bikes_available, station_id, num_docks_available, ts
from 
pulsar_citibikenyc;

select `systemtime`, `cputempf`, `cpu`, `humidity`, `co2`, `temperature`, `totalvocppb`, `equivalentco2ppm`, `pressure`, `temperatureicp`  
from  pulsar_thermalsensors

Create a Materialized View

http://localhost:18131/api/v1/query/5202/thermal?key=c674a39b-921a-4759-a2fb-e599366cfe51

/api/v1/query/5202/thermal?key=c674a39b-921a-4759-a2fb-e599366cfe51

Running SQL Stream Builder (Flink SQL) against Pulsar

TimSpannStream_0-1670288111845.jpeg
TimSpannStream_1-1670288111667.jpeg
TimSpannStream_2-1670288112022.jpeg
TimSpannStream_3-1670288111901.jpeg
TimSpannStream_4-1670288112517.jpeg
TimSpannStream_5-1670288111938.jpeg
TimSpannStream_6-1670288112041.jpeg
TimSpannStream_7-1670288112704.jpeg
TimSpannStream_8-1670288112702.jpeg
TimSpannStream_9-1670288111952.jpeg
TimSpannStream_10-1670288112771.jpeg
TimSpannStream_11-1670288112910.jpeg
TimSpannStream_13-1670288112904.jpeg
TimSpannStream_14-1670288112293.jpeg
TimSpannStream_15-1670288112494.jpeg

 

TimSpannStream_16-1670288112184.jpeg

Containers Running Cloudera CSP CE in Docker

TimSpannStream_17-1670288112896.jpeg

References

773 Views