Skip to content

Add a pre-select query to inbound JDBC adapter [INT-1228] #5237

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
spring-operator opened this issue Jul 4, 2010 · 17 comments
Closed

Add a pre-select query to inbound JDBC adapter [INT-1228] #5237

spring-operator opened this issue Jul 4, 2010 · 17 comments
Assignees
Labels
in: core status: declined There won't be a fix for some reason type: enhancement

Comments

@spring-operator
Copy link
Contributor

Brian Dussault opened INT-1228 and commented

The jdbc is a great addition to Spring Integration. I would like to submit an enhancment to the existing adapter... A fairly common scenario in a multi node integration environment environment is to have a polling jdbc adapters running conncurrently across the farm.The adapter currently allows for a select quey to obtain the records and an update to mark the records as picked up. When running concurrently across an integration farm, it would be desirable to have a "pre-select" update statement to mark the records as being worked on by a specific node (or thread).

Example:
<jdbc:inbound-channel-adapter pre-update="update item set status=2, lock_node='mynode" query="select * from item where status=2 and lock_node='mynode'" channel="target" data-source="dataSource" update="update item set status=10 where id in (:idList)" />

Thanks for the consideration


Affects: 2.0 M5

Attachments:

@spring-operator
Copy link
Contributor Author

Brian Dussault commented

Title should read: Add a "pre select" update option to JDBC adapter

@spring-operator
Copy link
Contributor Author

Brian Dussault commented

Use Cases

@spring-operator
Copy link
Contributor Author

Brian Dussault commented

Sample SQL

@spring-operator
Copy link
Contributor Author

@spring-operator
Copy link
Contributor Author

Dave Syer commented

I think you should be able to avoid duplicate messages just by locking the rows as you read them (i.e. no need for additional query). Or maybe I'm misunderstanding something about your use case. I looked at the forum post and that seems to be the gist of Iwein's argument as well.

You get platform independent locking without SQL dialect issues, by the way, if you use Isolation=REPEATABLE_READ (or to be absolutely sure on all platforms SERIALIZABLE) in the poller's transaction.

@spring-operator
Copy link
Contributor Author

Brian Dussault commented

Dave,
Thanks for the feedback.

My general thought here was that instead of having to leave a database physical lock open for multiple spring integration endpoints, we could mark the records using a logical lock and apply a compensating transaction if anything downstream failed. The goal was to minimize the amount of time that rows needs to be physically locked.
A secondary benefit is this approach allows for a one object (unique by key) per thread. The first query ensures that no other threads are working on a particular object. This helps maintain processing order while maximizing concurrency. This is similar in functionality to ActiveMQ’s Message Group feature: http://activemq.apache.org/message-groups.html

LOCK QUERY (prevents 2 nodes from operating on the same object):
• update items set PROCESS_STATE ='In-process', NODE_THREAD='node1-thread1', last_updated=systimestamp where node_thread is null and process_state = 'Created' and KEY NOT IN (select KEY from ITEMS where NODE_THREAD NOT IN ('node1-thread1') and PROCESS_STATE='In-process')

READ QUERY:
• SELECT: select distinct( key) from items where set PROCESS_STATE ='In-process' and NODE_THREAD='node1-thread1'

UPDATE QUERY:
• update items set PROCESS_STATE ='Complete', last_updated=systimestamp where NODE_THREAD='node1-thread1' and process_state = 'In-process';

Thoughts?
Brian

@spring-operator
Copy link
Contributor Author

Dave Syer commented

I like the fact that the name of the "pre-select" query has changed to "lock" (at least that tells me what you intend to use it for). But I don't like the example yet. Your example is more of a "reservation" than a "lock" (you called it a "logical lock"), and it reserves the whole table using a fairly scary looking query. It also requires a second transaction, if I understand what you are saying. So you could implement it easily yourself with a simple service that executes the "lock" query periodically, completely independent of the Spring Integration adapter.

The analogy with ActiveMQ is not very accurate because a message group there is a business grouping of messages by the producer, and the producer is nowhere to be seen in the example we are discussing. If your real use case is to assign messages to consumers according to a business key (I think it is based on what I know about your project), then we need to change the focus. The problem then is how to assign a business key to a consumer in a way that shares keys among consumers fairly and exclusively, and across restarts.

If all you need is a lock to prevent duplicate processing I still think you can do it already (but I would like to see it in action before we draw a line under it).

@spring-operator
Copy link
Contributor Author

Brian Dussault commented

I was thinking that the lock update (reservation) could be executed by the adapter at each polling interval. This would mark a "micro" batch of records as "reserved" by a specific node in the cluster. This does imply an immediate commit on this statement by the adapter. At this point a second transaction would begin and run the select statement which would ensure that no two nodes are processing the same object key. The analogy to the MessageGroups was more of a functional one from the consumer side; ActiveMQ ensures that the same consumers (thread) that has processed a key (for example customerId=12345) will all receive future updates for customerId=12345. It is a loose analogy but the behavior the consumer sees in the jdbc adapter would be similar(two nodes/threads would never see the same customerId=1234) while processing.

I think you have captured the essence of what i'm "up to" with your statement:"The problem then is how to assign a business key to a consumer in a way that shares keys among consumers fairly and exclusively, and across restarts".

I'm defintely open to alternative approaches.

@spring-operator
Copy link
Contributor Author

Brian Dussault commented

BTW. I think your max-rows attribute suggestion is a good one. Not sure if this was opened as a feature request yet, if not let me know if you want me to add this to JIRA.

@spring-operator
Copy link
Contributor Author

Dave Syer commented

Feel free to open a JIRA about the max-rows feature. I'll work on a prototype for the business key sharing and get back to you.

@spring-operator
Copy link
Contributor Author

Dave Syer commented

I created a GIST http://gist.github.com/472357 with a working prototype of a business key partitioning consumer. Only works on Oracle right now, but it's easy to adapt for other platforms. If you try it and it works I'd like to know.

It works by using isolation=SERIALIZABLE to lock rows as they are polled. The way it is implemented only scales well if the business keys are uniformly distributed, but there are obvious customizations that would fix that problem.

Curiously, for me at least, I had to include multiple inbound adapters to get it to scale, which seems odd, but that's another topic of discussion really, and maybe another feature request.

@spring-operator
Copy link
Contributor Author

Brian Dussault commented

Dave,
I'll take a look tonight and get feedback to you ASAP.

Thanks,
Brian

@spring-operator
Copy link
Contributor Author

Brian Dussault commented

Dave,
Is the spring integration jdbc snapshot your code refers to available at a maven snapshot repository?

Thanks,
Brian

@spring-operator
Copy link
Contributor Author

@spring-operator
Copy link
Contributor Author

Brian Dussault commented

Dave,

I tried the code and it works well and exhibits the desired behavior i was looking for, business key partitioning. I also tried the select for update approach with a gnarly oracle query to compare performance. For 10k records i was seeing approx 70ms per item (on local dev. system). Performance difference for the "select for update" vs transaction isloation level of serializable was nominal (within 5 ms).

While doing this reasearch and trying a few other approaches i did realize a need for a jdbc-gateway (bi-directional). Here is a realistic use case (IMHO):

  1. Receive object keys on a channel ("key-channel")
  2. jdbc-gateway is listening to the "key-channel" and fetches the payload (via jdbc) for the object key and puts the message onto an output channel
  3. output channel writes to target system

If you think this a reasonable requirement, i will open a new jira to cover this feature. Please feel free to mark this issue "resolved".

Thanks for the well thought out test case!

@spring-operator
Copy link
Contributor Author

Dave Syer commented

You are welcome.

I don't understand the gateway use case yet, but feel free to open a new issue on that topic, and we can continue the discussion there.

@spring-operator
Copy link
Contributor Author

spring-operator commented Sep 17, 2010

Gary Russell commented

gist needs the following update to work with the current snapshot...

$ git diff
diff --git a/src/main/resources/META-INF/spring/app-context.xml b/src/main/resources/META-INF/spring/app-context.xml
index 032b544..9a9ef16 100644
--- a/src/main/resources/META-INF/spring/app-context.xml
+++ b/src/main/resources/META-INF/spring/app-context.xml
@@ -23,20 +23,20 @@
        </poller>
 
        <int-jdbc:inbound-channel-adapter channel="target" query="${int.poll.query}"
-               sql-query-parameter-source="parameterSource" data-source="dataSource" max-rows-per-poll="2"
-               update="update item set status=10 where id in (:idList)" />
+               select-sql-parameter-source="parameterSource" data-source="dataSource" max-rows-per-poll="2"
+               update="update item set status=10 where id in (:id)" />
 
        <int-jdbc:inbound-channel-adapter channel="target" query="${int.poll.query}"
-               sql-query-parameter-source="parameterSource" data-source="dataSource" max-rows-per-poll="2"
-               update="update item set status=10 where id in (:idList)" />
+               select-sql-parameter-source="parameterSource" data-source="dataSource" max-rows-per-poll="2"
+               update="update item set status=10 where id in (:id)" />
 
        <int-jdbc:inbound-channel-adapter channel="target" query="${int.poll.query}"
-               sql-query-parameter-source="parameterSource" data-source="dataSource" max-rows-per-poll="2"
-               update="update item set status=10 where id in (:idList)" />
+               select-sql-parameter-source="parameterSource" data-source="dataSource" max-rows-per-poll="2"
+               update="update item set status=10 where id in (:id)" />
 
        <int-jdbc:inbound-channel-adapter channel="target" query="${int.poll.query}"
-               sql-query-parameter-source="parameterSource" data-source="dataSource" max-rows-per-poll="2"
-               update="update item set status=10 where id in (:idList)" />
+               select-sql-parameter-source="parameterSource" data-source="dataSource" max-rows-per-poll="2"
+               update="update item set status=10 where id in (:id)" />
 
        <int-jdbc:outbound-channel-adapter channel="insert" query="${int.insert.query}"
                data-source="dataSource" />

Also needs a build snapshot of SI that includes the fix to #5459

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: core status: declined There won't be a fix for some reason type: enhancement
Projects
None yet
Development

No branches or pull requests

2 participants