Infinispan
Since Camel 2.13
Both producer and consumer are supported
This component allows you to interact with Infinispan distributed data grid / cache using the Hot Rod protocol. Infinispan is an extremely scalable, highly available key/value data store and data grid platform written in Java.
If you use Maven, you must add the following dependency to your pom.xml:
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-infinispan</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency> URI format
infinispan://cacheName?[options]
The producer allows sending messages to a remote cache using the HotRod protocol. The consumer allows listening for events from a remote cache using the HotRod protocol.
Configuring Options
Camel components are configured on two separate levels:
-
component level
-
endpoint level
Configuring Component Options
At the component level, you set general and shared configurations that are, then, inherited by the endpoints. It is the highest configuration level.
For example, a component may have security settings, credentials for authentication, urls for network connection and so forth.
Some components only have a few options, and others may have many. Because components typically have pre-configured defaults that are commonly used, then you may often only need to configure a few options on a component; or none at all.
You can configure components using:
-
the Component DSL.
-
in a configuration file (
application.properties,*.yamlfiles, etc). -
directly in the Java code.
Configuring Endpoint Options
You usually spend more time setting up endpoints because they have many options. These options help you customize what you want the endpoint to do. The options are also categorized into whether the endpoint is used as a consumer (from), as a producer (to), or both.
Configuring endpoints is most often done directly in the endpoint URI as path and query parameters. You can also use the Endpoint DSL and DataFormat DSL as a type safe way of configuring endpoints and data formats in Java.
A good practice when configuring options is to use Property Placeholders.
Property placeholders provide a few benefits:
-
They help prevent using hardcoded urls, port numbers, sensitive information, and other settings.
-
They allow externalizing the configuration from the code.
-
They help the code to become more flexible and reusable.
The following two sections list all the options, firstly for the component followed by the endpoint.
Component Options
The Infinispan component supports 33 options, which are listed below.
Endpoint Options
The Infinispan endpoint is configured using URI syntax:
infinispan:cacheName
With the following path and query parameters:
Query Parameters (33 parameters)
Message Headers
The Infinispan component supports 18 message header(s), which is/are listed below:
| Name | Description | Default | Type |
|---|---|---|---|
CamelInfinispanEventType (consumer) Constant: | The type of the received event. | String | |
CamelInfinispanCacheName (common) Constant: | The cache participating in the operation or event. | String | |
| Constant: | The key to perform the operation to or the key generating the event. | Object | |
CamelInfinispanValue (producer) Constant: | The value to use for the operation. | Object | |
CamelInfinispanDefaultValue (producer) Constant: | The default value to use for a getOrDefault. | Object | |
CamelInfinispanOldValue (producer) Constant: | The old value to use for a replace. | Object | |
| Constant: | A Map to use in case of CamelInfinispanOperationPutAll operation. | Map | |
CamelInfinispanOperation (producer) Constant: | The operation to perform. Enum values:
| InfinispanOperation | |
CamelInfinispanOperationResult (producer) Constant: | The name of the header whose value is the result. | String | |
CamelInfinispanOperationResultHeader (producer) Constant: | Store the operation result in a header instead of the message body. | String | |
CamelInfinispanLifespanTime (producer) Constant: | The Lifespan time of a value inside the cache. Negative values are interpreted as infinity. | long | |
CamelInfinispanTimeUnit (producer) Constant: | The Time Unit of an entry Lifespan Time. Enum values:
| TimeUnit | |
CamelInfinispanMaxIdleTime (producer) Constant: | The maximum amount of time an entry is allowed to be idle for before it is considered as expired. | long | |
CamelInfinispanMaxIdleTimeUnit (producer) Constant: | The Time Unit of an entry Max Idle Time. Enum values:
| TimeUnit | |
CamelInfinispanEventData (consumer) Constant: | The event data. | Object | |
CamelInfinispanQueryBuilder (producer) Constant: | The QueryBuilder to use for QUERY command, if not present the command defaults to InifinispanConfiguration’s one. | InfinispanQueryBuilder | |
CamelInfinispanEntryVersion (consumer) Constant: | Provides access to the version of the created cache entry. | long | |
CamelInfinispanCommandRetried (consumer) Constant: | This will be true if the write command that caused this had to be retried again due to a topology change. | boolean |
Usage
Camel Operations
This section lists all available operations, along with their header information.
| Operation Name | Description |
|---|---|
| Put a key/value pair in the cache, optionally with expiration |
| Asynchronously puts a key/value pair in the cache, optionally with expiration |
| Put a key/value pair in the cache if it did not exist, optionally with expiration |
| Asynchronously puts a key/value pair in the cache if it did not exist, optionally with expiration |
-
Required Headers:
-
CamelInfinispanKey -
CamelInfinispanValue
-
-
Optional Headers:
-
CamelInfinispanLifespanTime -
CamelInfinispanLifespanTimeUnit -
CamelInfinispanMaxIdleTime -
CamelInfinispanMaxIdleTimeUnit
-
-
Result Header:
-
CamelInfinispanOperationResult
-
| Operation Name | Description |
|---|---|
| Adds multiple entries to a cache, optionally with expiration |
| Asynchronously adds multiple entries to a cache, optionally with expiration |
-
Required Headers:
-
CamelInfinispanMap
-
-
Optional Headers:
-
CamelInfinispanLifespanTime -
CamelInfinispanLifespanTimeUnit -
CamelInfinispanMaxIdleTime -
CamelInfinispanMaxIdleTimeUnit
-
| Operation Name | Description |
|---|---|
| Retrieve the value associated with a specific key from the cache |
| Retrieves the value, or default value, associated with a specific key from the cache |
-
Required Headers:
-
CamelInfinispanKeyThe resulting value is returned in the exchange body.
-
| Operation Name | Description |
|---|---|
| Determines whether a cache contains a specific key |
-
Required Headers
-
CamelInfinispanKey
-
-
Result Header
-
CamelInfinispanOperationResult
-
| Operation Name | Description |
|---|---|
| Determines whether a cache contains a specific value |
-
Required Headers:
-
CamelInfinispanValue
-
-
Result Header
-
CamelInfinispanOperationResult
-
| Operation Name | Description |
|---|---|
| Removes an entry from a cache, optionally only if the value matches a given one |
| Asynchronously removes an entry from a cache, optionally only if the value matches a given one |
-
Required Headers:
-
CamelInfinispanValue
-
-
Optional Headers:
-
CamelInfinispanValue
-
-
Result Header:
-
CamelInfinispanOperationResult
-
| Operation Name | Description |
|---|---|
| Conditionally replaces an entry in the cache, optionally with expiration |
| Asynchronously conditionally replaces an entry in the cache, optionally with expiration |
-
Required Headers:
-
CamelInfinispanKey -
CamelInfinispanValue -
CamelInfinispanOldValue
-
-
Optional Headers:
-
CamelInfinispanLifespanTime -
CamelInfinispanLifespanTimeUnit -
CamelInfinispanMaxIdleTime -
CamelInfinispanMaxIdleTimeUnit
-
-
Result Header:
-
CamelInfinispanOperationResult
-
| Operation Name | Description |
|---|---|
| Clears the cache |
| Asynchronously clears the cache |
| Operation Name | Description |
|---|---|
| Returns the number of entries in the cache |
-
Result Header
-
CamelInfinispanOperationResult
-
| Operation Name | Description |
|---|---|
| Returns statistics about the cache |
-
Result Header:
-
CamelInfinispanOperationResult
-
| Operation Name | Description |
|---|---|
| Executes a query on the cache |
-
Required Headers:
-
CamelInfinispanQueryBuilderThe resulting value is returned in the exchange body.
-
| Write methods like put(key, value) and remove(key) do not return the previous value by default. |
Examples
-
Put a key/value into a named cache:
from("direct:start") .setHeader(InfinispanConstants.OPERATION).constant(InfinispanOperation.PUT) (1) .setHeader(InfinispanConstants.KEY).constant("123") (2) .to("infinispan:myCacheName&cacheContainer=#cacheContainer"); (3)1 Set the operation to perform 2 Set the key used to identify the element in the cache 3 Use the configured cache manager cacheContainerfrom the registry to put an element to the cache namedmyCacheNameIt is possible to configure the lifetime and/or the idle time before the entry expires and gets evicted from the cache, as example:
from("direct:start") .setHeader(InfinispanConstants.OPERATION).constant(InfinispanOperation.GET) .setHeader(InfinispanConstants.KEY).constant("123") .setHeader(InfinispanConstants.LIFESPAN_TIME).constant(100L) (1) .setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT).constant(TimeUnit.MILLISECONDS.toString()) (2) .to("infinispan:myCacheName");1 Set the lifespan of the entry 2 Set the time unit for the lifespan -
Queries
from("direct:start") .setHeader(InfinispanConstants.OPERATION, InfinispanConstants.QUERY) .setHeader(InfinispanConstants.QUERY_BUILDER, new InfinispanQueryBuilder() { @Override public Query build(QueryFactory<Query> qf) { return qf.from(User.class).having("name").like("%abc%").build(); } }) .to("infinispan:myCacheName?cacheContainer=#cacheManager") ;The .proto descriptors for domain objects must be registered with the remote Data Grid server, see Remote Query Example in the official Infinispan documentation.
-
Custom Listeners
from("infinispan://?cacheContainer=#cacheManager&customListener=#myCustomListener") .to("mock:result");The instance of
myCustomListenermust exist and Camel should be able to look it up from theRegistry. Users are encouraged to extend theorg.apache.camel.component.infinispan.remote.InfinispanRemoteCustomListenerclass and annotate the resulting class with@ClientListenerwhich can be found in the packageorg.infinispan.client.hotrod.annotation.
Using the Infinispan based idempotent repository
In this section, we will use the Infinispan based idempotent repository.
-
Java
-
XML
InfinispanRemoteConfiguration conf = new InfinispanRemoteConfiguration(); (1)
conf.setHosts("localhost:1122");
InfinispanRemoteIdempotentRepository repo = new InfinispanRemoteIdempotentRepository("idempotent"); (2)
repo.setConfiguration(conf);
context.addRoutes(new RouteBuilder() {
@Override
public void configure() {
from("direct:start")
.idempotentConsumer(header("MessageID"), repo) (3)
.to("mock:result");
}
}); -
Configure the cache
-
Configure the repository bean
-
Set the repository to the route
<bean id="infinispanRepo" class="org.apache.camel.component.infinispan.remote.InfinispanRemoteIdempotentRepository" destroy-method="stop">
<constructor-arg value="idempotent"/> (1)
<property name="configuration"> (2)
<bean class="org.apache.camel.component.infinispan.remote.InfinispanRemoteConfiguration">
<property name="hosts" value="localhost:11222"/>
</bean>
</property>
</bean>
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start" />
<idempotentConsumer idempotentRepository="infinispanRepo"> (3)
<header>MessageID</header>
<to uri="mock:result" />
</idempotentConsumer>
</route>
</camelContext> -
Set the name of the cache that will be used by the repository
-
Configure the repository bean
-
Set the repository to the route
Using the Infinispan based aggregation repository
In this section, we will use the Infinispan based aggregation repository.
-
Java
-
XML
InfinispanRemoteConfiguration conf = new InfinispanRemoteConfiguration(); (1)
conf.setHosts("localhost:1122");
InfinispanRemoteAggregationRepository repo = new InfinispanRemoteAggregationRepository(); (2)
repo.setCacheName("aggregation");
repo.setConfiguration(conf);
context.addRoutes(new RouteBuilder() {
@Override
public void configure() {
from("direct:start")
.aggregate(header("MessageID"))
.completionSize(3)
.aggregationRepository(repo) (3)
.aggregationStrategy("myStrategy")
.to("mock:result");
}
}); -
Configure the cache
-
Create the repository bean
-
Set the repository to the route
<bean id="infinispanRepo" class="org.apache.camel.component.infinispan.remote.InfinispanRemoteAggregationRepository" destroy-method="stop">
<constructor-arg value="aggregation"/> (1)
<property name="configuration"> (2)
<bean class="org.apache.camel.component.infinispan.remote.InfinispanRemoteConfiguration">
<property name="hosts" value="localhost:11222"/>
</bean>
</property>
</bean>
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start" />
<aggregate aggregationStrategy="myStrategy"
completionSize="3"
aggregationRepository="infinispanRepo"> (3)
<correlationExpression>
<header>MessageID</header>
</correlationExpression>
<to uri="mock:result"/>
</aggregate>
</route>
</camelContext> -
Set the name of the cache that will be used by the repository
-
Configure the repository bean
-
Set the repository to the route
Embedding store support with camel-langchain4j-embeddings
This component provides the capability to store and query vector embeddings. To activate this functionality, add camel-langchain4j-embeddings to your project.
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-langchain4j-embeddings</artifactId>
<!-- use the same version as your Camel core version -->
<version>x.x.x</version>
</dependency> If you want to disable this functionality, set the embeddingStoreEnabled option to false.
To store an embedding:
from("direct:put")
// Create an embedding from the message body
.to("langchain4j-embeddings:create")
.setHeader(InfinispanConstants.OPERATION).constant(InfinispanOperation.PUT)
// Transform the embedding to a format usable by Infinispan
.transformDataType(new DataType("infinispan:embeddings"))
// Store the embedding
.to("infinispan:myCache?embeddingStoreDimension=384"); The embeddingStoreDimension option must be specified. It must also match the dimension of the embedding model used by the langchain4j-embeddings endpoint.
To query embeddings:
from("direct:query")
// Create an embedding from the message body
.to("langchain4j-embeddings:create")
.setHeader(InfinispanConstants.OPERATION).constant(InfinispanOperation.QUERY)
// Transforms the embedding to a vector kNN search
.transformDataType(new DataType("infinispan:embeddings"))
// Query embeddings
.to("infinispan:myCache?embeddingStoreDimension=384"); By default, a simple vector kNN search query is executed that looks like the following.
select i, score(i) from InfinispanRemoteEmbedding i
where i.embedding <-> [the vector embedding]~3 The ~3 part determines the distance from the search vector embedding, in relation to the configured vector similarity. These can be modified via the embeddingStoreDistance and embeddingStoreVectorSimilarity options.
The result of the query operation is List<Object[]>. The elements of the Object array are as follows.
| Index | Type | Description |
|---|---|---|
0 |
| Information about the embedding such as the embedding text and its |
1 |
| The score in relation to how closely the embedding matches the search text. |
| With the release of Infinispan 11, it is required to set the encoding configuration on any cache created. This is critical for consuming events too. For more information, have a look at Data Encoding and MediaTypes in the official Infinispan documentation. |
Spring Boot Auto-Configuration
When using infinispan with Spring Boot make sure to use the following Maven dependency to have support for auto configuration:
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-infinispan-starter</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency> The component supports 29 options, which are listed below.