<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<!-- Define a traditional camel context here -->
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<propertyPlaceholder id="properties" location="route.properties"/>
<endpoint id="csv2json" uri="dozer:csv2json2?sourceModel=org.acme.Customer&targetModel=org.globex.Account&marshalId=json&unmarshalId=csv&mappingFile=transformation.xml"/>
<!-- CSV Input & JSon OutPut DataFormat -->
<dataFormats>
<bindy classType="org.acme.Customer" id="csv" type="Csv"/>
<json id="json" library="Jackson"/>
</dataFormats>
<restConfiguration bindingMode="off" component="servlet" contextPath="/rest"/>
<rest apiDocs="true" id="rest-entry" path="/service">
<post id="rest-entry-post" uri="/customers">
<to uri="direct:inbox"/>
</post>
</rest>
<route id="_injectroute" streamCache="true">
<from id="_fromIR1" uri="direct:inbox"/>
<setExchangePattern id="_setExchangePattern3" pattern="InOnly"/>
<to id="_to1" uri="amqp:queue:inputQueue"/>
<transform id="_transform1">
<constant>Processed the customer data</constant>
</transform>
</route>
<route id="_route1" streamCache="true">
<!-- Consume files from input directory -->
<from id="_from1" uri="amqp:queue:inputQueue"/>
<onException id="_onException1">
<exception>java.lang.IllegalArgumentException</exception>
<handled>
<constant>true</constant>
</handled>
<log id="_log1" message=">> Exception : ${body}"/>
<setExchangePattern id="_setExchangePattern1" pattern="InOnly"/>
<to id="_to2" uri="direct:error"/>
</onException>
<split id="_split1">
<tokenize token=";"/>
<to id="_to3" ref="csv2json"/>
<setExchangePattern id="_setExchangePattern2" pattern="InOnly"/>
<to id="_to4" uri="amqp:queue:accountQueue"/>
<log id="_log2" message=">> Completed JSON: ${body}"/>
</split>
</route>
<!-- Publish the error code and error message on a topic -->
<route id="direct-error-queue">
<from id="_from2" uri="direct:error"/>
<setHeader headerName="error-code" id="_setHeader1">
<constant>111</constant>
</setHeader>
<setHeader headerName="error-message" id="_setHeader2">
<simple>${exception.message}</simple>
</setHeader>
<setHeader headerName="message" id="_setHeader3">
<simple>${body}</simple>
</setHeader>
<log id="_log3" logName="org.fuse.usecase"
loggingLevel="DEBUG" message="!!!! ERROR NOTIFICATION SEND"/>
<to id="error-queue-endpoint" uri="amqp:"/>
</route>
<!-- Consume the Topic message and publish it into the DB -->
<route id="error-queue-sql">
<from id="_from3" uri="amqp:"/>
<log id="_log4" logName="org.fuse.usecase"
loggingLevel="DEBUG" message="!!!! NOTIFICATION RECEIVED"/>
<log id="_log5" logName="org.fuse.usecase"
loggingLevel="DEBUG" message=">> Error code : ${header.error-code}, Error Message : ${header.error-message}"/>
<to id="_to5" uri="sql:insert into USECASE.T_ERROR(ERROR_CODE,ERROR_MESSAGE,MESSAGE,STATUS) values (:#${header.error-code}, :#${header.error-message}, :#${header.message}, 'ERROR');"/>
</route>
<!-- Inject correct record/message and update their status to CLOSE -->
<route id="sql-queue-input">
<from id="_from4" uri="sql:select MESSAGE, ID from USECASE.T_ERROR where STATUS = 'FIXED' ?consumer.onConsume=update USECASE.T_ERROR set STATUS='CLOSE' where ID = :#ID"/>
<setBody id="_setBody1">
<simple>${body[message]}</simple>
</setBody>
<log id="_log6" message=">> Body : ${body}"/>
<to id="_to6" uri="amqp:queue:inputQueue"/>
</route>
</camelContext>
</beans>
Publish Records and Fix Errors
Overview
This is a continue of Split and Transform Rest, but publish the individual JSON Account records to message input queues, including an error queue in the event of a processing error. You also develop a strategy to correct erroneous messages and redeliver them.
The route implements an interception strategy when IllegalArgumentException
is thrown. The erroneous record is published to a special topic and an error code and error message are added.
A separate route listens for error messages. When this route is triggered, the erroneous message and message code are saved into an error table with the status of ERROR
. To correct the record, you run a script against the error table and modify the status to FIXED
. You use another route to poll the database for FIXED
records. This route republishes the message on the original queue and updates the status to CLOSED
.
Goals
-
Publish records from the processing to an error topic
-
Implement a custom error handling strategy to capture the erroneous messages
-
Fix the messages and republish them
camelContext
本地测试
$ cd amq-broker-7.2.2/
$ ./bin/artemis create --user admin --password password --role admin --allow-anonymous y ./instances/broker1
$ cd instances/broker1/
$ ./bin/artemis run
$ cd fuse-get-started/rest-publish-and-fix-errors/
$ mvn spring-boot:run
$ curl -k http://localhost:8080/rest/service/customers -X POST -d 'Rotobots,NA,true,Bill,Smith,100 N Park Ave.,Phoenix,AZ,85017,602-555-1100;BikesBikesBikes,NA,true,George,Jungle,1101 Smith St.,Raleigh,NC,27519,919-555-0800;CloudyCloud,EU,true,Fred,Quicksand,202 Barney Blvd.,Rock City,MI,19728,313-555-1234;ErrorError,,,EU,true,Fred,Quicksand,202 Barney Blvd.,Rock City,MI,19728,313-555-1234' -H 'content-type: text/html'
Processed the customer data
部署到 OpenShift
$ oc new-app --template=amq-broker-72-basic \
-e AMQ_PROTOCOL=openwire,amqp,stomp,mqtt,hornetq \
-e AMQ_USER=admin \
-e AMQ_PASSWORD=password \
-e AMQ_ROLE=admin
$ oc new-app \
-e POSTGRESQL_USER=postgres \
-e POSTGRESQL_PASSWORD=postgres \
-e POSTGRESQL_DATABASE=sampledb \
postgresql-persistent
$ oc get pods | grep postgresql
$ oc rsh postgresql-1-wczss
sh-4.2$ createdb -h localhost -p 5432 -U postgres sampledb
sh-4.2$ PGPASSWORD=$POSTGRESQL_PASSWORD psql -h postgresql $POSTGRESQL_DATABASE $POSTGRESQL_USER
psql (9.6.10)
Type "help" for help.
sampledb=#
CREATE SCHEMA USECASE;
CREATE TABLE USECASE.T_ACCOUNT (
id SERIAL PRIMARY KEY,
CLIENT_ID integer,
SALES_CONTACT VARCHAR(30),
COMPANY_NAME VARCHAR(50),
COMPANY_GEO CHAR(20) ,
COMPANY_ACTIVE BOOLEAN,
CONTACT_FIRST_NAME VARCHAR(35),
CONTACT_LAST_NAME VARCHAR(35),
CONTACT_ADDRESS VARCHAR(255),
CONTACT_CITY VARCHAR(40),
CONTACT_STATE VARCHAR(40),
CONTACT_ZIP VARCHAR(10),
CONTACT_EMAIL VARCHAR(60),
CONTACT_PHONE VARCHAR(35),
CREATION_DATE TIMESTAMP,
CREATION_USER VARCHAR(255)
);
CREATE TABLE USECASE.T_ERROR (
ID SERIAL PRIMARY KEY,
ERROR_CODE VARCHAR(4) NOT NULL,
ERROR_MESSAGE VARCHAR(255),
MESSAGE VARCHAR(512),
STATUS CHAR(6)
);
$ cd fuse-get-started/rest-publish-and-fix-errors/
$ mvn fabric8:deploy -Popenshift
$ oc expose svc rest-publish-and-fix-errors
$ curl -k http://rest-publish-and-fix-errors-user4-fuse.apps.0d94.openshift.opentlc.com/rest/service/customers -X POST -d 'Rotobots,NA,true,Bill,Smith,100 N Park Ave.,Phoenix,AZ,85017,602-555-1100;BikesBikesBikes,NA,true,George,Jungle,1101 Smith St.,Raleigh,NC,27519,919-555-0800;CloudyCloud,EU,true,Fred,Quicksand,202 Barney Blvd.,Rock City,MI,19728,313-555-1234;ErrorError,,,EU,true,Fred,Quicksand,202 Barney Blvd.,Rock City,MI,19728,313-555-1234' -H 'content-type: text/html'
Processed the customer data
sampledb=# SELECT id, error_code, status FROM USECASE.T_ERROR;
id | error_code | status
----+------------+--------
1 | 111 | ERROR
(1 row)
sampledb=# UPDATE USECASE.T_ERROR SET STATUS='FIXED' WHERE ID=1;
UPDATE 1