Friday, May 27, 2016

Camel Kafka Integration Code




<bean id="basic" class="com.jet.camel.kafka.BodyConverter"/>


  <camel:camelContext xmlns="http://camel.apache.org/schema/spring" trace="false" id="kafkaContextProducer">
  <camel:propertyPlaceholder location="classpath:kafka.config.properties" cache="true" id="ppholder1" />


  <camel:route id="kafkaProducer" streamCache="true">
  <camel:from uri="direct:setData"/>
  <camel:log message="body :: ${body}  headers ::  ${headers}  "/>
<!-- <camel:convertBodyTo type="java.lang.String"/> -->

<bean ref="basic" method="converter"/>
<camel:log message="Producer body :: ${body} "/>
<camel:to uri="kafka:{{kafka.server}}:{{kafka.port}}?groupId=visitor&topic={{kafka.topic}}&producerType=sync&serializerClass=kafka.serializer.StringEncoder"/>
  </camel:route>


  <camel:route id="kafkaProducerVM"  streamCache="true">
  <camel:from uri="vm:tokafka?waitForTaskToComplete=Never"/>
<camel:doTry>
<camel:to uri="direct:setData"/>
<camel:doCatch>
<camel:exception>java.lang.Exception</camel:exception>
<camel:setBody> <camel:simple>BGW-Kafka#${exception.stacktrace} </camel:simple> </camel:setBody>
<camel:to uri="file:/opt/fuse/app/fuse/data/mislog?fileName=ErrorReport-${date:now:yyyyMMdd}.log&fileExist=Append" />
<camel:to uri="stream:out"/>
</camel:doCatch>
</camel:doTry>
  </camel:route>
 
  </camel:camelContext>

Camel Consumer ::

<!--    Java Class for all necessary Functionality -->
   <bean id="basic" class="com.jet.camel.kafka.BodyConverter"/>
 
<!--   Bean For Connecting With ICon DB -->
  <bean id="poolDS" class="com.mysql.jdbc.jdbc2.optional.MysqlConnectionPoolDataSource">
<property name="url" value="jdbc:mysql://${db.host}:${db.port}/${db.name}" />
<property name="user" value="${db.user}" />
<property name="password" value="${db.password}" />
</bean>

<bean id="sql" class="org.apache.camel.component.sql.SqlComponent">
<property name="DataSource" ref="poolDS" />
</bean>

<!-- Bean For Connecting With ICon DB -->
  <bean id="poolDSBGW" class="com.mysql.jdbc.jdbc2.optional.MysqlConnectionPoolDataSource">
<property name="url" value="jdbc:mysql://${db.host}:${db.port}/${db.billname}" />
<property name="user" value="${db.billuser}" />
<property name="password" value="${db.billpassword}" />
</bean>


    <bean id="sqlBGW" class="org.apache.camel.component.sql.SqlComponent">
<property name="DataSource" ref="poolDSBGW" />
</bean>


  <osgix:cm-properties id="preProps" persistent-id="kafka.properties" />
<ctx:property-placeholder properties-ref="preProps" />

  <camelContext xmlns="http://camel.apache.org/schema/spring" trace="true" id="kafkaContext">
  <camel:propertyPlaceholder location="classpath:kafka.config.properties,classpath:sql.properties" cache="true" id="ppholder1" />
  <camel:route id="mainKafkaRoute">
  <camel:from uri="kafka:{{kafka.server}}:{{kafka.port}}?groupId=visitor&topic={{kafka.topic}}&
zookeeperHost={{zookeeper.host}}&zookeeperPort={{zookeeper.port}}
  &autoOffsetReset=smallest&consumersCount=1"/>
 <camel:doTry>
 <camel:log message="Kafka Consumer Body ::  ${body}"/>
 <camel:to uri="file:/opt/fuse/app/fuse/data/mislog?fileName=KafKaConsumer-${date:now:yyyyMMdd}.log&fileExist=Append" />
 <bean ref="basic" method="parseBody"/>
 <camel:log message="Kafka Consumer headers ::  ${headers}"/>
 <camel:to uri="direct:callQuery"/>
 
<camel:doCatch>
<camel:exception>java.lang.Exception</camel:exception>
<camel:setBody> <camel:simple>$BGW-Kafka#${header.nowTime}#${headers}#${exception.stacktrace}</camel:simple> </camel:setBody>
<camel:to uri="file:/opt/fuse/app/fuse/data/mislog?fileName=ErrorReport-${date:now:yyyyMMdd}.log&fileExist=Append"/>
</camel:doCatch>
 </camel:doTry>
  </camel:route>


1 comment: