Skip to content

Commit 80433dd

Browse files
committed
Update NGSIToPostgres for adding new aattributes
1 parent c42c3de commit 80433dd

6 files changed

Lines changed: 454 additions & 16 deletions

File tree

Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
package org.apache.nifi.processors.ngsi;
2+
3+
import org.apache.nifi.annotation.behavior.InputRequirement;
4+
import org.apache.nifi.annotation.behavior.SupportsBatching;
5+
import org.apache.nifi.annotation.documentation.CapabilityDescription;
6+
import org.apache.nifi.annotation.documentation.Tags;
7+
import org.apache.nifi.components.PropertyDescriptor;
8+
import org.apache.nifi.flowfile.FlowFile;
9+
import org.apache.nifi.logging.ComponentLog;
10+
import org.apache.nifi.processor.*;
11+
import org.apache.nifi.processor.exception.ProcessException;
12+
import org.apache.nifi.processor.util.StandardValidators;
13+
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
14+
import org.apache.nifi.processors.ngsi.ngsi.backends.ckan.CkanBackend;
15+
import org.apache.nifi.processors.ngsi.ngsi.utils.Entity;
16+
import org.apache.nifi.processors.ngsi.ngsi.utils.NGSIEvent;
17+
import org.apache.nifi.processors.ngsi.ngsi.utils.NGSIUtils;
18+
import java.util.ArrayList;
19+
import java.util.HashSet;
20+
import java.util.List;
21+
import java.util.Set;
22+
23+
24+
@SupportsBatching
25+
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
26+
@Tags({"CKAN","ckan","sql", "put", "rdbms", "database", "create", "insert", "relational","NGSIv2", "NGSI","FIWARE"})
27+
@CapabilityDescription("Create a CKAN resource, package and dataset if not exits using the information coming from and NGSI event converted to flow file." +
28+
"After insert all of the vales of the flow file content extraction the entities and attributes")
29+
30+
31+
public class NGSIToCKAN extends AbstractProcessor {
32+
protected static final PropertyDescriptor CKAN_HOST = new PropertyDescriptor.Builder()
33+
.name("CKAN Host")
34+
.displayName("CKAN Host")
35+
.description("FQDN/IP address where the CKAN server runs. Default value is localhost")
36+
.required(true)
37+
.defaultValue("localhost")
38+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
39+
.build();
40+
41+
protected static final PropertyDescriptor CKAN_PORT = new PropertyDescriptor.Builder()
42+
.name("CKAN Port")
43+
.displayName("CKAN Port")
44+
.description("Port where the CKAN server runs. Default value is 80")
45+
.required(true)
46+
.defaultValue("80")
47+
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
48+
.build();
49+
50+
protected static final PropertyDescriptor CKAN_VIEWER = new PropertyDescriptor.Builder()
51+
.name("CKAN Viewer")
52+
.displayName("CKAN Viewer")
53+
.description("The CKAN resource page can contain one or more visualizations of the resource data or file contents (a table, a bar chart, a map, etc). These are commonly referred to as resource views.")
54+
.required(true)
55+
.defaultValue("recline_grid_view")
56+
.allowableValues("recline_view", "recline_grid_view","recline_graph_view","recline_map_view","text_view","image_view","video_view","audio_view","webpage_view")
57+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
58+
.build();
59+
60+
protected static final PropertyDescriptor CKAN_API_KEY = new PropertyDescriptor.Builder()
61+
.name("CKAN API Key")
62+
.displayName("CKAN API Key")
63+
.description("The APi Key you are going o use in CKAN")
64+
.required(true)
65+
.defaultValue("XXXXXX")
66+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
67+
.build();
68+
69+
protected static final PropertyDescriptor ORION_URL = new PropertyDescriptor.Builder()
70+
.name("ORION URL")
71+
.displayName("ORION URL")
72+
.description("To be put as the filestore URL.\n")
73+
.required(true)
74+
.defaultValue(" http://localhost:1026")
75+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
76+
.build();
77+
78+
protected static final PropertyDescriptor SSL = new PropertyDescriptor.Builder()
79+
.name("SSL")
80+
.displayName("SSL")
81+
.description("ssl for connection")
82+
.required(false)
83+
.defaultValue("false")
84+
.allowableValues("false", "true")
85+
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
86+
.build();
87+
88+
protected static final PropertyDescriptor DATA_MODEL = new PropertyDescriptor.Builder()
89+
.name("data-model")
90+
.displayName("Data Model")
91+
.description("The Data model for creating the tables when an event have been received you can choose between" +
92+
":db-by-service-path or db-by-entity for ngsiv2 and db-by-entity or db-by-entity-type for ngsi-ld, default value is db-by-entity")
93+
.required(false)
94+
.allowableValues("db-by-entity-id", "db-by-entity")
95+
.defaultValue("db-by-entity")
96+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
97+
.build();
98+
99+
protected static final PropertyDescriptor ATTR_PERSISTENCE = new PropertyDescriptor.Builder()
100+
.name("attr-persistence")
101+
.displayName("Attribute Persistence")
102+
.description("The mode of storing the data inside of the table")
103+
.required(false)
104+
.allowableValues("row", "column")
105+
.defaultValue("row")
106+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
107+
.build();
108+
109+
protected static final PropertyDescriptor NGSI_VERSION = new PropertyDescriptor.Builder()
110+
.name("ngsi-version")
111+
.displayName("NGSI Version")
112+
.description("The version of NGSI of your incomming events. You can choose Between v2 for NGSIv2 and ld for NGSI-LD ")
113+
.required(false)
114+
.allowableValues("v2","ld")
115+
.defaultValue("v2")
116+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
117+
.build();
118+
119+
protected static final PropertyDescriptor DEFAULT_SERVICE = new PropertyDescriptor.Builder()
120+
.name("default-service")
121+
.displayName("Default Service")
122+
.description("Default Fiware Service for building the database name")
123+
.required(false)
124+
.defaultValue("test")
125+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
126+
.build();
127+
128+
protected static final PropertyDescriptor DEFAULT_SERVICE_PATH = new PropertyDescriptor.Builder()
129+
.name("default-service-path")
130+
.displayName("Default Service path")
131+
.description("Default Fiware ServicePath for building the table name")
132+
.required(false)
133+
.defaultValue("/path")
134+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
135+
.build();
136+
137+
protected static final PropertyDescriptor ENABLE_ENCODING= new PropertyDescriptor.Builder()
138+
.name("enable-encoding")
139+
.displayName("Enable Encoding")
140+
.description("true or false, true applies the new encoding, false applies the old encoding.")
141+
.required(false)
142+
.allowableValues("true", "false")
143+
.defaultValue("true")
144+
.build();
145+
146+
protected static final PropertyDescriptor ENABLE_LOWERCASE= new PropertyDescriptor.Builder()
147+
.name("enable-lowercase")
148+
.displayName("Enable Lowercase")
149+
.description("true or false, true for creating the Schema and Tables name with lowercase.")
150+
.required(false)
151+
.allowableValues("true", "false")
152+
.defaultValue("true")
153+
.build();
154+
155+
protected static final PropertyDescriptor TRANSACTION_TIMEOUT = new PropertyDescriptor.Builder()
156+
.name("Transaction Timeout")
157+
.description("If the <Support Fragmented Transactions> property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute "
158+
+ "to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship")
159+
.required(false)
160+
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
161+
.build();
162+
163+
protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
164+
.name("Batch Size")
165+
.description("The preferred number of FlowFiles to put to the database in a single transaction")
166+
.required(true)
167+
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
168+
.defaultValue("10")
169+
.build();
170+
171+
protected static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
172+
.name("Max Connections")
173+
.description("Maximum number of connections allowed for a Http-based HDFS backend.")
174+
.required(true)
175+
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
176+
.defaultValue("500")
177+
.build();
178+
179+
protected static final PropertyDescriptor MAX_CONNECTIONS_PER_ROUTE = new PropertyDescriptor.Builder()
180+
.name("Max Connections per Route")
181+
.description("Maximum number of connections per route allowed for a Http-based HDFS backend.")
182+
.required(true)
183+
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
184+
.defaultValue("100")
185+
.build();
186+
187+
protected static final Relationship REL_SUCCESS = new Relationship.Builder()
188+
.name("success")
189+
.description("A FlowFile is routed to this relationship after the database is successfully updated")
190+
.build();
191+
protected static final Relationship REL_RETRY = new Relationship.Builder()
192+
.name("retry")
193+
.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
194+
.build();
195+
protected static final Relationship REL_FAILURE = new Relationship.Builder()
196+
.name("failure")
197+
.description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, "
198+
+ "such as an invalid query or an integrity constraint violation")
199+
.build();
200+
201+
202+
203+
@Override
204+
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
205+
final List<PropertyDescriptor> properties = new ArrayList<>();
206+
properties.add(CKAN_HOST);
207+
properties.add(CKAN_PORT);
208+
properties.add(CKAN_VIEWER);
209+
properties.add(CKAN_API_KEY);
210+
properties.add(ORION_URL);
211+
properties.add(SSL);
212+
properties.add(NGSI_VERSION);
213+
properties.add(DATA_MODEL);
214+
properties.add(ATTR_PERSISTENCE);
215+
properties.add(DEFAULT_SERVICE);
216+
properties.add(DEFAULT_SERVICE_PATH);
217+
properties.add(ENABLE_ENCODING);
218+
properties.add(ENABLE_LOWERCASE);
219+
properties.add(BATCH_SIZE);
220+
properties.add(MAX_CONNECTIONS);
221+
properties.add(MAX_CONNECTIONS_PER_ROUTE);
222+
properties.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
223+
return properties;
224+
}
225+
226+
@Override
227+
public Set<Relationship> getRelationships() {
228+
final Set<Relationship> rels = new HashSet<>();
229+
rels.add(REL_SUCCESS);
230+
rels.add(REL_RETRY);
231+
rels.add(REL_FAILURE);
232+
return rels;
233+
}
234+
235+
protected void persistFlowFile(final ProcessContext context, final FlowFile flowFile, ProcessSession session) {
236+
237+
final String[] host = {context.getProperty(CKAN_HOST).getValue()};
238+
final String port = context.getProperty(CKAN_PORT).getValue();
239+
final String apiKey = context.getProperty(CKAN_API_KEY).getValue();
240+
final String ckanViewer = context.getProperty(CKAN_VIEWER).getValue();
241+
final String orioUrl = context.getProperty(ORION_URL).getValue();
242+
final boolean ssl = context.getProperty(SSL).asBoolean();
243+
final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger();
244+
final int maxConnectionsPerRoute = context.getProperty(MAX_CONNECTIONS_PER_ROUTE).asInteger();
245+
final boolean enableEncoding = context.getProperty(ENABLE_ENCODING).asBoolean();
246+
final boolean enableLowercase = context.getProperty(ENABLE_LOWERCASE).asBoolean();
247+
final CkanBackend ckanBackend = new CkanBackend(apiKey,host,port,orioUrl,ssl,maxConnections,maxConnectionsPerRoute,ckanViewer);
248+
NGSIUtils n = new NGSIUtils();
249+
final String ngsiVersion=context.getProperty(NGSI_VERSION).getValue();
250+
final String dataModel=context.getProperty(DATA_MODEL).getValue();
251+
252+
final NGSIEvent event=n.getEventFromFlowFile(flowFile,session,ngsiVersion);
253+
final long creationTime = event.getCreationTime();
254+
final String fiwareService = (event.getFiwareService().compareToIgnoreCase("nd")==0)?context.getProperty(DEFAULT_SERVICE).getValue():event.getFiwareService();
255+
final String fiwareServicePath = ("ld".equals(context.getProperty(NGSI_VERSION).getValue()))?"":(event.getFiwareServicePath().compareToIgnoreCase("/nd")==0)?context.getProperty(DEFAULT_SERVICE_PATH).getValue():event.getFiwareServicePath();
256+
try {
257+
final String orgName = ckanBackend.buildOrgName(fiwareService,dataModel,enableEncoding,enableLowercase,ngsiVersion);
258+
ArrayList<Entity> entities= new ArrayList<>();
259+
entities = ("ld".equals(context.getProperty(NGSI_VERSION).getValue()))?event.getEntitiesLD():event.getEntities();
260+
for (Entity entity : event.getEntities()) {
261+
final String pkgName = ckanBackend.buildPkgName(fiwareService,entity,dataModel,enableEncoding,enableLowercase,ngsiVersion);
262+
final String resName = ckanBackend.buildResName(entity,dataModel,enableEncoding,enableLowercase,ngsiVersion);
263+
264+
} // for
265+
266+
}catch (Exception e){
267+
getLogger().error(e.toString());
268+
}
269+
}
270+
271+
@Override
272+
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
273+
final FlowFile flowFile = session.get();
274+
if (flowFile == null) {
275+
return;
276+
}
277+
278+
final ComponentLog logger = getLogger();
279+
280+
try {
281+
persistFlowFile(context, flowFile, session);
282+
logger.info("inserted {} into CKAN", new Object[]{flowFile});
283+
session.getProvenanceReporter().send(flowFile, "report");
284+
session.transfer(flowFile, REL_SUCCESS);
285+
} catch (Exception e) {
286+
logger.error("Failed to insert {} into CKAN due to {}", new Object[] {flowFile, e}, e);
287+
session.transfer(flowFile, REL_FAILURE);
288+
context.yield();
289+
}
290+
}
291+
292+
}
293+

nifi-ngsi-bundle/nifi-ngsi-processors/src/main/java/org/apache/nifi/processors/ngsi/NGSIToPostgreSQL.java

100644100755
Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -268,12 +268,13 @@ void apply(final ProcessContext context, final ProcessSession session, final Fun
268268
JdbcCommon.setParameters(stmt, flowFile.getAttributes());
269269
try {
270270
System.out.println(postgres.checkColumnNames(tableName));
271-
ResultSet rs = conn.createStatement().executeQuery(postgres.checkColumnNames(tableName));
272-
newColumns = postgres.getNewColumns(rs,listOfFields);
273271
conn.createStatement().execute(postgres.createSchema(schemaName));
274-
conn.createStatement().execute(postgres.addColumns(schemaName,tableName,newColumns));
275-
276272
conn.createStatement().execute(postgres.createTable(schemaName, tableName,listOfFields));
273+
ResultSet rs = conn.createStatement().executeQuery(postgres.checkColumnNames(tableName));
274+
newColumns = postgres.getNewColumns(rs,listOfFields);
275+
if (newColumns.size()>0){
276+
conn.createStatement().execute(postgres.addColumns(schemaName,tableName,newColumns));
277+
}
277278
System.out.println(schemaName+"."+tableName+" columns -------- : ");
278279

279280
} catch (SQLException s) {

nifi-ngsi-bundle/nifi-ngsi-processors/src/main/java/org/apache/nifi/processors/ngsi/ngsi/backends/PostgreSQLBackend.java

100644100755
File mode changed.

0 commit comments

Comments
 (0)