Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
node_modules/
coverage/
20 changes: 20 additions & 0 deletions __tests__/eval.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
const test = require('tape')
const sinon = require('sinon')
const evalInContext = require('../eval')

test('evalInContext - evals global variable', (t) => {
global.property = 'property'
t.equal(evalInContext('property', {}), 'property')
t.end()
})

test('evalInContext - evals function in context', (t) => {
const clientid = sinon.stub().returns('test')
t.equal(evalInContext('clientid()', { clientid }), 'test')
t.end()
})

test('throws error if variable does not exist', (t) => {
t.throws((() => evalInContext('notHere', {})))
t.end()
})
110 changes: 110 additions & 0 deletions __tests__/sql.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
const test = require('tape')
const sinon = require('sinon')
const { parseSelect, applySelect } = require('../sql.js')

test('parseSelect - parses simple SQL correctly', (t) => {
const subject = "SELECT * FROM 'topic'"
const results = parseSelect(subject)
t.deepEqual(results.select, [{ field: '*', alias: undefined }])
t.equal(results.topic, 'topic')
t.equal(results.where, undefined)
t.end()
})

test('parseSelect - parses lowercase simple SQL correctly', (t) => {
const subject = "select * from 'topic'"
const results = parseSelect(subject)
t.deepEqual(results.select, [{ field: '*', alias: undefined }])
t.equal(results.topic, 'topic')
t.equal(results.where, undefined)
t.end()
})

test('parseSelect - parses where clause correctly', (t) => {
const subject = "SELECT * FROM 'topic' WHERE name='Bob'"
const results = parseSelect(subject)
t.deepEqual(results.select, [{ field: '*', alias: undefined }])
t.equal(results.topic, 'topic')
t.equal(results.where, "name='Bob'")
t.end()
})

test('parseSelect - parses multiple SELECT properties correctly', (t) => {
const subject = "SELECT name, age, maleOrFemale AS gender FROM 'topic'"
const results = parseSelect(subject)
t.deepEqual(results.select, [
{ field: 'name', alias: undefined},
{ field: 'age', alias: undefined },
{ field: 'maleOrFemale', alias: 'gender'}
])
t.end()
})

test('applySelect - Simple select with buffered string handled correctly', (t) => {
const select = [{ field: '*', alias: undefined }]
const payload = Buffer.from(JSON.stringify({name: 'Bob'}), 'utf8')
const context = {}
const event = applySelect({ select, payload, context })
t.deepEqual(event, { name: 'Bob' })
t.end()
})

test('applySelect - Simple select with non-JSON handled correctly', (t) => {
const select = [{ field: '*', alias: undefined }]
const payload = 'Bob'
const context = {}
const event = applySelect({ select, payload, context })
t.equal(event, 'Bob')
t.end()
})

test('applySelect - Aliased wildcard with non-JSON handled correctly', (t) => {
const select = [{ field: '*', alias: 'name' }]
const payload = 'Bob'
const context = {}
const event = applySelect({ select, payload, context })
t.deepEqual(event, { 'name': 'Bob'})
t.end()
})

test('applySelect - Unaliased wildcard plus function results in flattened output', (t) => {
const select = [
{ field: '*', alias: undefined },
{ field: 'clientid()', alias: undefined }
]
const clientIdFunc = sinon.stub().returns(undefined);
const payload = Buffer.from(JSON.stringify({name: 'Bob'}), 'utf8')
const context = { clientid: clientIdFunc }
const event = applySelect({ select, payload, context })
t.ok(clientIdFunc.calledOnce)
t.deepEqual(event, { name: 'Bob', 'clientid()': undefined })
t.end()
})

test('applySelect - Aliased wildcard plus function results in nested output', (t) => {
const select = [
{ field: '*', alias: 'message' },
{ field: 'clientid()', alias: undefined }
]
const clientIdFunc = sinon.stub().returns(undefined);
const payload = Buffer.from(JSON.stringify({name: 'Bob'}), 'utf8')
const context = { clientid: clientIdFunc }
const event = applySelect({ select, payload, context })
t.ok(clientIdFunc.calledOnce)
t.deepEqual(event, { message: { name: 'Bob' }, 'clientid()': undefined })
t.end()
})

test('applySelect - Function results are appeneded to output', (t) => {
const select = [
{ field: '*', alias: 'message' },
{ field: 'clientid()', alias: 'theClientId' }
]
const clientIdFunc = sinon.stub().returns('12345')
const payload = Buffer.from(JSON.stringify({name: 'Bob'}), 'utf8')
const context = { clientid: clientIdFunc }
const event = applySelect({ select, payload, context })
t.ok(clientIdFunc.calledOnce)
t.deepEqual(event, { message: { name: 'Bob' }, 'theClientId': '12345' })
t.end()
})
2 changes: 1 addition & 1 deletion eval.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// TODO: trim(), ltrim(), etc

const evalInContext = (js, context) => {
const { clientid, topic } = context
const { clientid, topic, principal } = context
try {
return eval(js)
} catch (err) {
Expand Down
41 changes: 34 additions & 7 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ServerlessIotLocal {
this.log = serverless.cli.log.bind(serverless.cli)
this.service = serverless.service
this.options = options
this.provider = 'aws'
this.provider = this.serverless.getProvider('aws')
this.mqttBroker = null
this.requests = {}

Expand Down Expand Up @@ -93,7 +93,18 @@ class ServerlessIotLocal {

startHandler() {
this.originalEnvironment = _.extend({ IS_OFFLINE: true }, process.env)
this.options = _.merge({}, defaultOpts, (this.service.custom || {})['serverless-iot-local'], this.options)

const custom = this.service.custom || {}
const inheritedFromServerlessOffline = _.pick(custom['serverless-offline'] || {}, ['skipCacheInvalidation'])

this.options = _.merge(
{},
defaultOpts,
inheritedFromServerlessOffline,
custom['serverless-iot-local'],
this.options
)

if (!this.options.noStart) {
this._createMQTTBroker()
}
Expand Down Expand Up @@ -166,6 +177,7 @@ class ServerlessIotLocal {
const { port, httpPort, location } = this.options
const topicsToFunctionsMap = {}
const { runtime } = this.service.provider
const stackName = this.provider.naming.getStackName()
Object.keys(this.service.functions).forEach(key => {
const fun = this._getFunction(key)
const funName = key
Expand Down Expand Up @@ -195,7 +207,11 @@ class ServerlessIotLocal {
const { sql } = iot
// hack
// assumes SELECT ... topic() as topic
const parsed = SQL.parseSelect(sql)
const parsed = SQL.parseSelect({
sql,
stackName,
})

const topicMatcher = parsed.topic
if (!topicsToFunctionsMap[topicMatcher]) {
topicsToFunctionsMap[topicMatcher] = []
Expand All @@ -214,9 +230,15 @@ class ServerlessIotLocal {
const client = mqtt.connect(`ws://localhost:${httpPort}/mqqt`)
client.on('error', console.error)

const connectMonitor = setInterval(() => {
this.log(`still haven't connected to local Iot broker!`)
}, 5000).unref()
let connectMonitor
const startMonitor = () => {
clearInterval(connectMonitor)
connectMonitor = setInterval(() => {
this.log(`still haven't connected to local Iot broker!`)
}, 5000).unref()
}

startMonitor()

client.on('connect', () => {
clearInterval(connectMonitor)
Expand All @@ -226,6 +248,8 @@ class ServerlessIotLocal {
}
})

client.on('disconnect', startMonitor)

client.on('message', (topic, message) => {
const matches = Object.keys(topicsToFunctionsMap)
.filter(topicMatcher => mqttMatch(topicMatcher, topic))
Expand All @@ -252,14 +276,17 @@ class ServerlessIotLocal {
payload: message,
context: {
topic: () => topic,
clientid: () => clientId
clientid: () => clientId,
principal: () => {}
}
})

let handler // The lambda function
try {
process.env = _.extend({}, this.service.provider.environment, this.service.functions[name].environment, this.originalEnvironment)
process.env.SERVERLESS_OFFLINE_PORT = apiGWPort
process.env.AWS_LAMBDA_FUNCTION_NAME = this.service.service + '-' + this.service.provider.stage
process.env.AWS_REGION = this.service.provider.region
handler = functionHelper.createHandler(options, this.options)
} catch (err) {
this.log(`Error while loading ${name}: ${err.stack}, ${requestId}`)
Expand Down
11 changes: 10 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
{
"name": "serverless-iot-local",
"version": "1.3.0",
"version": "2.1.2",
"description": "local iot events for the Serverless Framework",
"main": "index.js",
"repository": "https://github.com/tradle/serverless-iot-local",
"author": "mvayngrib",
"license": "MIT",
"scripts": {
"test": "tape '__tests__/**/*'",
"coverage": "istanbul cover tape tape '__tests__/**/*'"
},
"dependencies": {
"aws-sdk-mock": "^1.7.0",
"ip": "^1.1.5",
Expand All @@ -18,5 +22,10 @@
"peerDependencies": {
"aws-sdk": "*",
"serverless-offline": "*"
},
"devDependencies": {
"istanbul": "^0.4.5",
"sinon": "^5.0.3",
"tape": "^4.9.0"
}
}
30 changes: 26 additions & 4 deletions sql.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
const evalInContext = require('./eval')
const BASE64_PLACEHOLDER = '*b64'
const SQL_REGEX = /^SELECT (.*)\s+FROM\s+'([^']+)'\s*(?:WHERE\s(.*))?$/
const SELECT_PART_REGEX = /^(.*?)(?: as (.*))?$/
const SQL_REGEX = /^SELECT (.*)\s+FROM\s+'([^']+)'\s*(?:WHERE\s(.*))?$/i
const SELECT_PART_REGEX = /^(.*?)(?: AS (.*))?$/i

const parseSelect = sql => {
const parseSelect = ({ sql, stackName }) => {
// if (/\([^)]/.test(sql)) {
// throw new Error(`AWS Iot SQL functions in this sql are not yet supported: ${sql}`)
// }

if (typeof sql === 'object') {
const sub = sql['Fn::Sub']
if (!sub) {
throw new Error('expected sql to be a string or have Fn::Sub')
}

sql = sub.replace(/\$\{AWS::StackName\}/g, stackName)
}

const [select, topic, where] = sql.match(SQL_REGEX).slice(1)
return {
select: select
Expand Down Expand Up @@ -61,7 +70,20 @@ const applySelect = ({ select, payload, context }) => {
const { alias, field } = part
const key = alias || field
if (field === '*') {
event[key] = json
/*
* If there is an alias for the wildcard selector, we want to include the fields in a nested key.
* SELECT * as message, clientid() from 'topic'
* { message: { fieldOne: 'value', ...}}
*
* Otherwise, we want the fields flat in the resulting event object.
* SELECT *, clientid() from 'topic'
* { fieldOne: 'value', ...}
*/
if(alias) {
event[key] = json
} else {
Object.assign(event, json)
}
continue
}

Expand Down