Member since 
    
	
		
		
		11-16-2021
	
	
	
	
	
	
	
	
	
	
	
	
	
	
			
      
                3
            
            
                Posts
            
        
                4
            
            
                Kudos Received
            
        
                0
            
            
                Solutions
            
        
			
    
	
		
		
		02-27-2023
	
		
		11:16 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 
 Cloudera CSP CE Plus Apache Pulsar 
 See:  pulsar-csp-ce 
 Source:   https://github.com/tspannhw/pulsar-csp-ce 
 Integration 
 
 https://community.cloudera.com/t5/Cloudera-Stream-Processing-Forum/Using-Apache-Pulsar-with-SQL-Stream-Builder/m-p/349917 
 
 Once running in Docker 
 http://localhost:18121/ui/login 
 Login 
 admin/admin 
 SQL Testing 
 
 CREATE TABLE pulsar_test (
  `city` STRING
) WITH (
  'connector' = 'pulsar',
  'topic' = 'topic82547611',
  'value.format' = 'raw',
  'service-url' = 'pulsar://Timothys-MBP:6650',
  'admin-url' = 'http://Timothys-MBP:8080',
  'scan.startup.mode' = 'earliest',
  'generic' = 'true'
);
CREATE TABLE  `pulsar_table_1670269295` (
  `col_str` STRING,
  `col_int` INT,
  `col_ts` TIMESTAMP(3),
   WATERMARK FOR `col_ts` AS col_ts - INTERVAL '5' SECOND
) WITH (
  'format' = 'json' -- Data format
  -- 'json.encode.decimal-as-plain-number' = 'false' -- Optional flag to specify whether to encode all decimals as plain numbers instead of possible scientific notations, false by default.
  -- 'json.fail-on-missing-field' = 'false' -- Optional flag to specify whether to fail if a field is missing or not, false by default.
  -- 'json.ignore-parse-errors' = 'false' -- Optional flag to skip fields and rows with parse errors instead of failing; fields are set to null in case of errors, false by default.
  -- 'json.map-null-key.literal' = 'null' -- Optional flag to specify string literal for null keys when 'map-null-key.mode' is LITERAL, \"null\" by default.
  -- 'json.map-null-key.mode' = 'FAIL' -- Optional flag to control the handling mode when serializing null key for map data, FAIL by default. Option DROP will drop null key entries for map data. Option LITERAL will use 'map-null-key.literal' as key literal.
  -- 'json.timestamp-format.standard' = 'SQL' -- Optional flag to specify timestamp format, SQL by default. Option ISO-8601 will parse input timestamp in \"yyyy-MM-ddTHH:mm:ss.s{precision}\" format and output timestamp in the same format. Option SQL will parse input timestamp in \"yyyy-MM-dd HH:mm:ss.s{precision}\" format and output timestamp in the same format.
);
CREATE TABLE pulsar_citibikenyc (
	`num_docks_disabled` DOUBLE,
	`eightd_has_available_keys` STRING,
	`station_status` STRING,
	`last_reported` DOUBLE,
	`is_installed` DOUBLE,
	`num_ebikes_available` DOUBLE,
	`num_bikes_available` DOUBLE,
	`station_id` DOUBLE,
	`is_renting` DOUBLE,
	`is_returning` DOUBLE,
	`num_docks_available` DOUBLE,
	`num_bikes_disabled` DOUBLE,
	`legacy_id` DOUBLE,
	`valet` STRING,
	`eightd_active_station_services` STRING,
	`ts` DOUBLE,
	`uuid` STRING
) WITH (
  'connector' = 'pulsar',
  'topic' = 'persistent://public/default/citibikenyc',
  'value.format' = 'json',
  'service-url' = 'pulsar://Timothys-MBP:6650',
  'admin-url' = 'http://Timothys-MBP:8080',
  'scan.startup.mode' = 'earliest',
  'generic' = 'true'
);
CREATE TABLE pulsar_thermalsensors (
  `uuid` STRING NOT NULL,
  `ipaddress` STRING NOT NULL,
  `cputempf` INT NOT NULL,
  `runtime` INT NOT NULL,
  `host` STRING NOT NULL,
  `hostname` STRING NOT NULL,
  `macaddress` STRING NOT NULL,
  `endtime` STRING NOT NULL,
  `te` STRING NOT NULL,
  `cpu` FLOAT NOT NULL,
  `diskusage` STRING NOT NULL,
  `memory` FLOAT NOT NULL,
  `rowid` STRING NOT NULL,
  `systemtime` STRING NOT NULL,
  `ts` INT NOT NULL,
  `starttime` STRING NOT NULL,
  `datetimestamp` STRING NOT NULL,
  `temperature` FLOAT NOT NULL,
  `humidity` FLOAT NOT NULL,
  `co2` FLOAT NOT NULL,
  `totalvocppb` FLOAT NOT NULL,
  `equivalentco2ppm` FLOAT NOT NULL,
  `pressure` FLOAT NOT NULL,
  `temperatureicp` FLOAT NOT NULL
) WITH (
  'connector' = 'pulsar',
  'topic' = 'persistent://public/default/thermalsensors',
  'value.format' = 'json',
  'service-url' = 'pulsar://Timothys-MBP:6650',
  'admin-url' = 'http://Timothys-MBP:8080',
  'scan.startup.mode' = 'earliest',
  'generic' = 'true'
)
CREATE TABLE  fake_data (
city STRING )
WITH (
'connector' = 'faker',
'rows-per-second' = '1',
'fields.city.expression' = '#{Address.city}'
);
insert into pulsar_test
select * from fake_data;
select last_reported, num_bikes_available, station_id, num_docks_available, ts
from 
pulsar_citibikenyc;
select `systemtime`, `cputempf`, `cpu`, `humidity`, `co2`, `temperature`, `totalvocppb`, `equivalentco2ppm`, `pressure`, `temperatureicp`  
from  pulsar_thermalsensors
 
 
 Create a Materialized View 
 
 http://localhost:18131/api/v1/query/5202/thermal?key=c674a39b-921a-4759-a2fb-e599366cfe51
/api/v1/query/5202/thermal?key=c674a39b-921a-4759-a2fb-e599366cfe51
 
 
 Running SQL Stream Builder (Flink SQL) against Pulsar 
                                                          
   
  
 Containers Running Cloudera CSP CE in Docker 
  
 References 
 
 https://github.com/tspannhw/create-nifi-pulsar-flink-apps 
 https://github.com/streamnative/flink-example/blob/main/docker-compose.yml 
 https://docs.cloudera.com/csp-ce/latest/index.html 
 https://github.com/tspannhw/SpeakerProfile 
 
 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
	
					
			
		
	
	
	
	
				
		
	
	
			
    
	
		
		
		03-07-2022
	
		
		08:28 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		3 Kudos
		
	
				
		
	
		
					
							 
 Cloudera has added a StreamNative community-supported processor. 
 Reading OpenSea NFT with Apache NiFi, Apache Pulsar, and Friends. Meetup. StreamNative 
   
    
 Upcoming Meetup Demo 
 Apache Pulsar and Apache NiFi for Cloud Data Lakes 
 Pulsar Build 
 
 bin/pulsar-admin topics delete persistent://public/default/nft
bin/pulsar-admin topics delete persistent://public/default/crypto
bin/pulsar-admin topics create persistent://public/default/nft
bin/pulsar-admin topics create persistent://public/default/crypto
bin/pulsar-client consume "persistent://public/default/crypto" -s "nftreadercrypto" -n 0
bin/pulsar-client consume "persistent://public/default/nft" -s "nftyreader" -n 0
bin/pulsar-admin schemas get "persistent://public/default/nft" 
 
 Apache NiFi Flows 
 NiFi -> Pulsar Flow 
 Example NFT JSON Record 
 
 {
  "date" : "Thu, 24 Feb 2022 22:26:41 GMT",
  "short_description" : "",
  "featured" : "false",
  "image_thumbnail_url" : "https://lh3.googleusercontent.com/-dsYkPkvAQW6KvP0Nnbjecb4Pqo1RsLvTlKKDdSPiUR4LFGqTp3mvf2uOkLtF5p1c1eLIZi6S6WhqvOX4dY-MPhCy8zg8E5_PIUTrA=s128",
  "asset_contract_created_date" : "2022-02-21T01:50:07.184731",
  "asset_contract_owner" : "240677413",
  "image_preview_url" : "https://lh3.googleusercontent.com/-dsYkPkvAQW6KvP0Nnbjecb4Pqo1RsLvTlKKDdSPiUR4LFGqTp3mvf2uOkLtF5p1c1eLIZi6S6WhqvOX4dY-MPhCy8zg8E5_PIUTrA=s250",
  "asset_contract_symbol" : "Bunny Buddies",
  "twitter_username" : "",
  "description" : "The Bunny Buddies are 8888 NFTs created by Ryan Robinson. With this collection, the 3D artist explores duality and looks for the right balance between darkness and light.",
  "asset_contract_address" : "0x91cc3844b8271337679f8c00cb2d238886917d40",
  "external_url" : "https://bunny-buddies.com/",
  "token_id" : "4640",
  "asset_contract_name" : "Bunny Buddies",
  "asset_contract_nft_version" : "3.0",
  "asset_contract_description" : "The Bunny Buddies are a collection of 8888 collectibles designed by Ryan Robinson aka Rhabbitz (Yobritish). These bunnies with crazy aesthetics and strong personalities are on their way to take over the metaverse. \nTo know more about the project, visit our website : https://bunny-buddies.com",
  "asset_contract_external_link" : "https://bunny-buddies.com/",
  "id" : "304361312",
  "featured_image_url" : "https://lh3.googleusercontent.com/xE_aoM0UA6n1-Y5_eaI5YOq3EHJA8S7AKsQrGSminMv0kuWcg2ifdz0riEdpiusV6ZPWXEE_HY1JL7QksrpLlVpORVasp5HewjgBvQ=s300",
  "slug" : "bunny-buddies",
  "token_metadata" : "https://bunnybuddies.mypinata.cloud/ipfs/QmRRavBvkQLEd1F64QxtQoeUzPvUZFJo6dhmtT5wWbvnQR/4640",
  "asset_contract_schema_name" : "ERC721",
  "animation_url" : "https://storage.opensea.io/files/482cae8fbabbff6c1d208426ed8ad961.mp4",
  "num_sales" : "0",
  "image_url" : "https://lh3.googleusercontent.com/wqWbyWmm2jrGHmuCoudc96Cikocao2L4XE1NxHSdl87I9rWC_wvA2l0ubxtQqDBpeSibZbrOuJiWK0dNTOIMDaBRIWvIkoDraMb8PWM=s120",
  "asset_contract_default_to_fiat" : "false",
  "external_link" : "",
  "image_original_url" : "https://bunnybuddies.mypinata.cloud/ipfs/QmZ8xSD7Dt48FYinQ2SKK95q2boJk7QZHsWDCKfTXUYm3J/reveal.mp4",
  "asset_contract_payout_address" : "0xfde43ebd4f75960cdac70971b731e0bab144c8f2",
  "animation_original_url" : "https://bunnybuddies.mypinata.cloud/ipfs/QmZ8xSD7Dt48FYinQ2SKK95q2boJk7QZHsWDCKfTXUYm3J/reveal.mp4",
  "background_color" : "",
  "asset_contract_asset_contract_type" : "non-fungible",
  "name" : "#4641",
  "asset_contract_image_url" : "https://lh3.googleusercontent.com/wqWbyWmm2jrGHmuCoudc96Cikocao2L4XE1NxHSdl87I9rWC_wvA2l0ubxtQqDBpeSibZbrOuJiWK0dNTOIMDaBRIWvIkoDraMb8PWM=s120",
  "asset_contract_total_supply" : "0"
}
 
 
 StreamNative Cloud 
 Resources 
 
 https://github.com/tspannhw/FLiPN-Demos 
 Producing and Consuming Pulsar messages with Apache NiFi 
 https://github.com/streamnative/pulsar-nifi-bundle 
 Pulsar in Python on Pi for Sensors 
 https://www.linkedin.com/newsletters/flip-stack-weekly-6861715928728576000/ 
 https://github.com/tspannhw/SpeakerProfile/tree/main/2022/talks 
 https://www.slideshare.net/bunkertor/devfest-uk-ireland-using-apache-nifi-with-apache-pulsar-for-fast-data-onramp-2022 
 https://www.slideshare.net/bunkertor/data-science-online-camp-using-the-flipn-stack-for-edge-ai-flink-nifi-pulsar 
 https://www.slideshare.net/bunkertor/api-world-apache-nifi-101 
 https://www.slideshare.net/bunkertor/ai-dev-world-utilizing-apache-pulsar-apache-ni-fi-and-minifi-for-edgeai-iot-at-scale 
 https://github.com/tspannhw/EverythingApacheNiFi 
 https://www.slideshare.net/bunkertor/apachecon-2021-apache-nifi-101-introduction-and-best-practices 
 https://www.pulsardeveloper.com/ 
 https://arcade.sqlbits.com/session-details/?id=298240 
 
   
 Resources 
 
 https://github.com/tspannhw/awesome-nifi-pulsar/blob/main/README.md 
 Cloudera What's New in Flow Management 
 Cloudera CDF Datahub Supported Partner Components 
 https://github.com/streamnative/pulsar-nifi-bundle 
 https://github.com/tspannhw/FLiPN-NFT 
 https://www.datainmotion.dev/2021/11/producing-and-consuming-pulsar-messages.html 
 
           
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		 
        


