<bean id="basic" class="com.jet.camel.kafka.BodyConverter"/>
<camel:camelContext xmlns="http://camel.apache.org/schema/spring" trace="false" id="kafkaContextProducer">
<camel:propertyPlaceholder location="classpath:kafka.config.properties" cache="true" id="ppholder1" />
<camel:route id="kafkaProducer" streamCache="true">
<camel:from uri="direct:setData"/>
<camel:log message="body :: ${body} headers :: ${headers} "/>
<!-- <camel:convertBodyTo type="java.lang.String"/> -->
<bean ref="basic" method="converter"/>
<camel:log message="Producer body :: ${body} "/>
<camel:to uri="kafka:{{kafka.server}}:{{kafka.port}}?groupId=visitor&topic={{kafka.topic}}&producerType=sync&serializerClass=kafka.serializer.StringEncoder"/>
</camel:route>
<camel:route id="kafkaProducerVM" streamCache="true">
<camel:from uri="vm:tokafka?waitForTaskToComplete=Never"/>
<camel:doTry>
<camel:to uri="direct:setData"/>
<camel:doCatch>
<camel:exception>java.lang.Exception</camel:exception>
<camel:setBody> <camel:simple>BGW-Kafka#${exception.stacktrace} </camel:simple> </camel:setBody>
<camel:to uri="file:/opt/fuse/app/fuse/data/mislog?fileName=ErrorReport-${date:now:yyyyMMdd}.log&fileExist=Append" />
<camel:to uri="stream:out"/>
</camel:doCatch>
</camel:doTry>
</camel:route>
</camel:camelContext>
Camel Consumer ::
<!-- Java Class for all necessary Functionality -->
<bean id="basic" class="com.jet.camel.kafka.BodyConverter"/>
<!-- Bean For Connecting With ICon DB -->
<bean id="poolDS" class="com.mysql.jdbc.jdbc2.optional.MysqlConnectionPoolDataSource">
<property name="url" value="jdbc:mysql://${db.host}:${db.port}/${db.name}" />
<property name="user" value="${db.user}" />
<property name="password" value="${db.password}" />
</bean>
<bean id="sql" class="org.apache.camel.component.sql.SqlComponent">
<property name="DataSource" ref="poolDS" />
</bean>
<!-- Bean For Connecting With ICon DB -->
<bean id="poolDSBGW" class="com.mysql.jdbc.jdbc2.optional.MysqlConnectionPoolDataSource">
<property name="url" value="jdbc:mysql://${db.host}:${db.port}/${db.billname}" />
<property name="user" value="${db.billuser}" />
<property name="password" value="${db.billpassword}" />
</bean>
<bean id="sqlBGW" class="org.apache.camel.component.sql.SqlComponent">
<property name="DataSource" ref="poolDSBGW" />
</bean>
<osgix:cm-properties id="preProps" persistent-id="kafka.properties" />
<ctx:property-placeholder properties-ref="preProps" />
<camelContext xmlns="http://camel.apache.org/schema/spring" trace="true" id="kafkaContext">
<camel:propertyPlaceholder location="classpath:kafka.config.properties,classpath:sql.properties" cache="true" id="ppholder1" />
<camel:route id="mainKafkaRoute">
<camel:from uri="kafka:{{kafka.server}}:{{kafka.port}}?groupId=visitor&topic={{kafka.topic}}&
zookeeperHost={{zookeeper.host}}&zookeeperPort={{zookeeper.port}}
&autoOffsetReset=smallest&consumersCount=1"/>
<camel:doTry>
<camel:log message="Kafka Consumer Body :: ${body}"/>
<camel:to uri="file:/opt/fuse/app/fuse/data/mislog?fileName=KafKaConsumer-${date:now:yyyyMMdd}.log&fileExist=Append" />
<bean ref="basic" method="parseBody"/>
<camel:log message="Kafka Consumer headers :: ${headers}"/>
<camel:to uri="direct:callQuery"/>
<camel:doCatch>
<camel:exception>java.lang.Exception</camel:exception>
<camel:setBody> <camel:simple>$BGW-Kafka#${header.nowTime}#${headers}#${exception.stacktrace}</camel:simple> </camel:setBody>
<camel:to uri="file:/opt/fuse/app/fuse/data/mislog?fileName=ErrorReport-${date:now:yyyyMMdd}.log&fileExist=Append"/>
</camel:doCatch>
</camel:doTry>
</camel:route>
Why do producer need a groupId ?
ReplyDelete