N
N
nuclear_kote2019-02-12 21:28:11
Java
nuclear_kote, 2019-02-12 21:28:11

How to cast complex CapnProto structure from kafka in clickhouse?

Created a table in click:

CREATE TABLE queue (    
  foo Nested (
         code String, 
         value UInt32
  ),  
  bla UInt32) 
ENGINE = Kafka 
SETTINGS 
  kafka_broker_list = 'localhost:9092', 
  kafka_topic_list = 'test', 
  kafka_group_name = 'group1', 
  kafka_format = 'CapnProto',  
  kafka_schema = 'bar:BarStruct',  
  kafka_num_consumers = 1;

Created a capn schema:
struct BarStruct
{
    foo @0 :FooStruct;
    bla @1 :UInt32;
}

struct FooStruct
{
    code @0 :Text;
    value @1 :UInt32;
}

I am trying to cast from java to kafka like this:
public class KafkaTest {

    public static Producer<Long, byte[]> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "testClient");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        return new KafkaProducer<>(props);
    }

    public static void main(String[] args) throws InterruptedException {
        org.capnproto.MessageBuilder message = new org.capnproto.MessageBuilder();
        Bar.BarStruct.Builder barStruct = message.initRoot(Bar.BarStruct.factory);
        barStruct.setBla(32);
        Bar.FooStruct.Builder fooBuilder = barStruct.initFoo();
        fooBuilder.setCode("Blabla");
        fooBuilder.setValue(42);
        final Producer<Long, byte[]> producer = createProducer();
        while (true) {
            for (ByteBuffer buffer : message.getSegmentsForOutput()) {
                final byte[] bytes = buffer.array();
                System.out.println(Arrays.toString(bytes));
                ProducerRecord<Long, byte[]> producerRecord = new ProducerRecord<>("test", bytes);
                producer.send(producerRecord);
            }
            TimeUnit.SECONDS.sleep(1);
        }
    }

}

As a result, the result of the select is something like this:
host :) select * from queue

SELECT *
FROM queue5

Received exception from server (version 19.1.6):
Code: 33. DB::Exception: Received from localhost:9090, ::1. DB::Exception: Cannot read all data. Bytes read: 8192. Bytes expected: 524304.: (Input format doesn't allow to skip errors): (at row 1)
. 

0 rows in set. Elapsed: 0.504 sec. 

host :) select * from queue

SELECT *
FROM queue

Ok.

Answer the question

In order to leave comments, you need to log in

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question