Member since
04-19-2023
22
Posts
2
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
927 | 05-08-2023 12:30 AM | |
2070 | 04-20-2023 07:31 AM |
04-27-2023
01:12 AM
this method does not work, puthdfs creates a directory with the extension directory1.parquet and inside the directory there is still a file without extension, I tried like this /user/test/${now():toNumber()}.parquet, but I need to have files in the same test directory and not parquet directories
... View more
04-26-2023
08:19 AM
puthdfs process can file give transform parquet to HDFS or do i need putparquet to HDFS?
... View more
04-26-2023
08:00 AM
I figured out this error, then I have a problem, I get files in HDFS, but they are without the parquet extension, I don’t understand what kind of files these are if I put them everywhere without compression
... View more
04-26-2023
05:12 AM
ConvertRecord[id=bc90b7d4-0187-1000-0000-00003eadf340] Failed to process FlowFile[filename=043b3d7b-5de0-4f7e-842f-2cbe5c972ace]; will route to failure: org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: required group file { I'm trying differently now avroshemaregistry { "type": "record", "name": "kafka_record", "fields": [ {"name": "upstream_response_time", "type": "string"}, {"name": "ssl_cipher", "type": "string"}, {"name": "upstream", "type": "string"}, {"name": "log", "type": { "type": "record", "name": "log", "fields": [ {"name": "offset", "type": "long"}, {"name": "file", "type": { "type": "record", "name": "file", "fields": [] }} ] }}, {"name": "method", "type": "string"}, {"name": "clientip", "type": "string"}, {"name": "user_agent", "type": "string"}, {"name": "realip", "type": "string"}, {"name": "http_accept_encoding", "type": "string"}, {"name": "country", "type": "string"}, {"name": "timestamp_delivery", "type": "string"}, {"name": "http_accept_language", "type": "string"}, {"name": "scheme", "type": "string"}, {"name": "request_id", "type": "string"}, {"name": "http_referer", "type": "string"}, {"name": "req_lengths", "type": "string"}, {"name": "server_protocol", "type": "string"}, {"name": "request", "type": "string"}, {"name": "request_time", "type": "string"}, {"name": "ssl_protocol", "type": "string"}, {"name": "host", "type": "string"}, {"name": "cache", "type": "string"}, {"name": "input", "type": { "type": "record", "name": "input", "fields": [] }}, {"name": "agent", "type": { "type": "record", "name": "agent", "fields": [] }}, {"name": "hostname_logstash", "type": "string"}, {"name": "x_requested_with", "type": "string"}, {"name": "status", "type": "string"}, {"name": "project_id", "type": "string"}, {"name": "cookie_session", "type": "string"}, {"name": "timestamp", "type": "string"}, {"name": "serverip", "type": "string"}, {"name": "geo", "type": "string"}, {"name": "source", "type": "string"}, {"name": "upstream_status", "type": "string"}, {"name": "upstream_port", "type": "string"}, {"name": "hostname", "type": "string"}, {"name": "size", "type": "string"}, {"name": "ssl_ja3_hash", "type": "string"}, {"name": "sni", "type": "string"}, {"name": "http_accept", "type": "string"}, {"name": "location_id", "type": "string"}, {"name": "server_port", "type": "string"}, {"name": "timestamp_record", "type": "string"}, {"name": "param_request", "type": "string"} ] }
... View more
04-26-2023
03:47 AM
I don’t have a json schema, I want it to generate a schema for the input data and send it to the parquet on my parameters, this does not work
... View more
04-26-2023
02:25 AM
my settings from jsontreeRead: Schema Access Strategy Infer Schema Schema Inference Cache No value set Starting Field Strategy Root Node Date Format yyyy-MM-dd Time Format HH:mm:ss Timestamp Format yyyy-MM-dd'T'HH:mm:ss.SSSX parquetrecordsetwriter Schema Write StrategyDo Not Write Schema Schema CacheNo value set Schema Access StrategyInherit Record Schema Cache Size1000 Compression TypeUNCOMPRESSED Row Group SizeNo value set Page SizeNo value set Dictionary Page SizeNo value set Max Padding SizeNo value set Enable Dictionary EncodingNo value set Enable ValidationNo value set Writer VersionNo value set Avro Write Old List Structurefalse Avro Add List Element Recordsfalse INT96 FieldsNo value set I don’t have a json schema, I want it to generate a schema for the input data and send it to the parquet on my parameters, this does not work
... View more
04-25-2023
11:24 PM
It’s not clear to me about the process, I managed to shorten it, but this is not a parquet Consumer_kafka--MergeContent---PutHDFS at the output in HDFS GZ, you need to somehow change the parquet in MergeContent to include such a FlowFile MergeContent from kafka into one large file, as I do in Parquet this happen?
... View more
04-25-2023
11:23 PM
It’s not clear to me about the process, I managed to shorten it, but this is not a parquet Consumer_kafka--MergeContent---PutHDFS at the output in HDFS GZ, you need to somehow change the parquet in MergeContent to include such a FlowFile MergeContent from kafka into one large file, as I do in Parquet this happen? FlowFile Properties Key: 'entryDate' Value: 'Tue Apr 25 09:14:38 MSK 2023' Key: 'lineageStartDate' Value: 'Tue Apr 25 09:14:38 MSK 2023' Key: 'fileSize' Value: '1305' FlowFile Attribute Map Content Key: 'filename' Value: '597dd31f-294d3-f5301d4c446b' Key: 'kafka.consumer.id' Value: 'readtopicnifi' Key: 'kafka.consumer.offsets.committed' Value: 'true' Key: 'kafka.leader.epoch' Value: '2' Key: 'kafka.max.offset' Value: '11016028' Key: 'kafka.offset' Value: '110168' Key: 'kafka.partition' Value: '0' Key: 'kafka.timestamp' Value: '1681997605' Key: 'kafka.topic' Value: 'ng' Key: 'path' Value: './' Key: 'uuid' Value: '091096d655236-4b0f-a0bf-55у3d3e819' MergeRECORd recordread JsonTree doesn't work as flowfile comes from kafka process, how can this be implemented for parquet?
... View more
04-20-2023
07:31 AM
1 Kudo
the issue was resolved the problem was in the settings of the version of the tls protocol
... View more
04-20-2023
06:57 AM
2023-04-20 16:51:55,924 ERROR [Timer-Driven Process Thread-10] o.a.n.p.kafka.pubsub.ConsumeKafka_2_6 [ConsumeKafka_2_6[id=9da42b1a-0187-1000-ffff-ffffb41254ef], org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@475bfba5, org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed] Exception while interacting with Kafka so will close the lease {} due to {} org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed Caused by: javax.net.ssl.SSLHandshakeException: No subject alternative names matching IP address 10.1 found at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:353) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:296) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:291) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1357) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1232) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1175) at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392) at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1076) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1063) at java.base/java.security.AccessController.doPrivileged(Native Method) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1010) at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:430) at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:514) at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:368) at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:291) at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547) at org.apache.kafka.common.network.Selector.poll(Selector.java:485) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:220) at org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6.onTrigger(ConsumeKafka_2_6.java:479) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1357) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.security.cert.CertificateException: No subject alternative names matching IP address 10.1 found at java.base/sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:165) at java.base/sun.security.util.HostnameChecker.match(HostnameChecker.java:101) at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455) at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:429) at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:283) at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1335) ... 37 common frames omitted
... View more
- « Previous
-
- 1
- 2
- Next »