Skip to content

Commit

Permalink
Stored procedure support
Browse files Browse the repository at this point in the history
Add field name renaming in the register map.

Change of RiverSource API, the SQL command is passed through the result set processing
because it carries the callable statement register map information for the field
names.
  • Loading branch information
jprante committed Aug 5, 2014
1 parent 247a6f5 commit c934b05
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 62 deletions.
62 changes: 57 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ bulk mode ensures high throughput when indexing to Elasticsearch.

| Elasticsearch version | Plugin | Release date |
| ------------------------ | -----------| -------------|
| 1.3.1 | 1.3.0.4 | Aug 5, 2014 |
| 1.3.1 | 1.3.0.3 | Aug 4, 2014 |
| 1.3.1 | 1.3.0.2 | Aug 2, 2014 |
| 1.3.1 | 1.3.0.1 | Jul 31, 2014 |
Expand All @@ -60,7 +61,7 @@ bulk mode ensures high throughput when indexing to Elasticsearch.

## Installation

./bin/plugin --install jdbc --url http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-river-jdbc/1.3.0.3/elasticsearch-river-jdbc-1.3.0.3-plugin.zip
./bin/plugin --install jdbc --url http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-river-jdbc/1.3.0.4/elasticsearch-river-jdbc-1.3.0.4-plugin.zip

Do not forget to restart the node after installing.

Expand All @@ -75,6 +76,7 @@ Change into this directory to invoke the `./bin/plugin` command line tool.

| File | SHA1 |
| ---------------------------------------------| -----------------------------------------|
| elasticsearch-river-jdbc-1.3.0.4-plugin.zip | dcb412285f6274ef07c05068311dacb745fe8046 |
| elasticsearch-river-jdbc-1.3.0.3-plugin.zip | 7e3fe518c716305a7878fddb299f0c263fb5ed4b |
| elasticsearch-river-jdbc-1.3.0.2-plugin.zip | 7f87af3055223d15238da9c81ae95ff6ea0ce934 |
| elasticsearch-river-jdbc-1.3.0.1-plugin.zip | ee58c51acfb4bc2294939c655ff2f790890808bc |
Expand Down Expand Up @@ -231,8 +233,7 @@ Example:
"sql" : [
{
"statement" : "select ... from ... where a = ?, b = ?, c = ?",
"parameter" : [ "value for a", "value for b", "value for c" ],
"callable" : false
"parameter" : [ "value for a", "value for b", "value for c" ]
},
{
"statement" : ...
Expand All @@ -241,6 +242,8 @@ Example:

`sql.statement` - the SQL statement

`sql.callable` - boolean flag, if true, the SQL statement is interpreted as a JDBC CallableStatement for stored procedures (default: false).

`sql.parameter` - bind parameters for the SQL statement (in order). Some special values can be used with the following meanings:

* `$now` - the current timestamp
Expand All @@ -255,8 +258,6 @@ Example:
* `$river.state.timestamp` - last timestamp of river activity (from river state)
* `$river.state.counter` - counter from river state, counts the numbers of runs

`sql.callable` - boolean flag, if true, the SQL statement is interpreted as a JDBC CallableStatement (default: false). Note: callable statement support is experimental and not well tested.

`locale` - the default locale (used for parsing numerical values, floating point character. Recommended values is "en_US")

`timezone` - the timezone for JDBC setTimestamp() calls when binding parameters with timestamp values
Expand Down Expand Up @@ -694,6 +695,57 @@ will result into the following JSON documents
id=<random> {"product":"Apples","created":1338501600000,"department":"German Fruits","quantity":2,"customer":"Good"}
id=<random> {"product":"Oranges","created":1338501600000,"department":"English Fruits","quantity":3,"customer":"Bad"}

# Stored procedures or callable statements

Stored procedures can also be used for fetchng data, like this example fo MySQL illustrates.
See also [Using Stored Procedures](http://docs.oracle.com/javase/tutorial/jdbc/basics/storedprocedures.html)
from where the example is taken.

create procedure GET_SUPPLIER_OF_COFFEE(
IN coffeeName varchar(32),
OUT supplierName varchar(40))
begin
select SUPPLIERS.SUP_NAME into supplierName
from SUPPLIERS, COFFEES
where SUPPLIERS.SUP_ID = COFFEES.SUP_ID
and coffeeName = COFFEES.COF_NAME;
select supplierName;
end

Now it is possible to call the procedure from the JDBC plugin and index the result in Elasticsearch.

{
"jdbc" : {
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : [
{
"callable" : true,
"statement" : "{call GET_SUPPLIER_OF_COFFEE(?,?)}",
"parameter" : [
"Colombian"
],
"register" : {
"mySupplierName" : { "pos" : 2, "type" : "varchar" }
}
}
],
"index" : "my_jdbc_river_index",
"type" : "my_jdbc_river_type"
}
}

Note, the `parameter` lists the input parameters in the order they should be applied, like in an
ordinary statement. The `register` declares a list of output parameters in the particular order
the `pos` number indicates. It is required to declare the JDBC type in the `type` attribute.
`mySupplierName`, the key of the output parameter, is used as the Elasticsearch field name specification,
like the column name specification in an ordinary SQL statement, because column names are not available
in callable statement result sets.

If there is more than one result sets returned by a callable statement,
the JDBC plugin enters a loop and iterates through all result sets.

# Monitoring the JDBC river state

While a river/feed is running, you can monitor the activity by using the `_state` command.
Expand Down
28 changes: 26 additions & 2 deletions src/main/java/org/xbib/elasticsearch/river/jdbc/RiverSource.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.xbib.elasticsearch.river.jdbc;

import org.xbib.elasticsearch.plugin.jdbc.RiverContext;
import org.xbib.elasticsearch.plugin.jdbc.SQLCommand;
import org.xbib.keyvalue.KeyValueStreamListener;

import java.io.IOException;
Expand Down Expand Up @@ -156,6 +157,20 @@ public interface RiverSource {

void beforeRows(ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException;

boolean nextRow(ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException;

void afterRows(ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException;

/**
* This routine is executed before the result set is evaluated
* @param command the SQL command that created this result set
* @param results the result set
* @param listener listener for the key/value stream generated from the result set
* @throws SQLException
* @throws IOException
*/
void beforeRows(SQLCommand command, ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException;

/**
* Action for the next row of the result set to be processed
*
Expand All @@ -165,9 +180,18 @@ public interface RiverSource {
* @throws SQLException when SQL execution gives an error
* @throws IOException when input/output error occurs
*/
boolean nextRow(ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException;
boolean nextRow(SQLCommand command, ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException;

void afterRows(ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException;
/**
* After the result set is processed, this method is called.
*
* @param command the SQL command that created this result set
* @param results the result set
* @param listener listener for the key/value stream generated from the result set
* @throws SQLException
* @throws IOException
*/
void afterRows(SQLCommand command, ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException;

/**
* Parse a value in a row column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private void fetch(Connection connection, SQLCommand command, OpInfo opInfo, Tim
KeyValueStreamListener<Object, Object> listener =
new ColumnKeyValueStreamListener<Object, Object>(opInfo.opType)
.output(context.getRiverMouth());
merge(result, listener);
merge(command, result, listener);
} catch (Exception e) {
throw new IOException(e);
} finally {
Expand Down
Loading

0 comments on commit c934b05

Please sign in to comment.