test webhooks

This commit is contained in:
Manuel Romero
2024-08-30 11:19:31 +02:00
parent 5e80311c2b
commit 3a8bc61d10
10 changed files with 121 additions and 18 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "qmi-cloud-app",
"version": "5.0.2",
"version": "5.0.3",
"scripts": {
"start": "node server/server.js",
"start:dev": "nodemon server/server.js",

View File

@@ -0,0 +1,32 @@
const mongoose = require('mongoose');
const { scenario } = require('../mongo');
const schema = new mongoose.Schema({
created: {
type: Date,
default: Date.now,
index : true
},
owner: {
type: mongoose.Types.ObjectId,
ref: 'User'
},
eventType: {
type: String // provision-finished, provision-failed, provision-destroyed
},
scenario: {
type: String // scenario name
},
url: {
type: String
},
headers: {
type: Object
},
queryparams: {
type: Object
}
});
module.exports = mongoose.model('Webhook', schema)

View File

@@ -39,6 +39,7 @@ const SharedProvision = require('./models/SharedProvision');
const TrainingTemplate = require('./models/TrainingTemplate');
const TrainingSession = require('./models/TrainingSession');
const TrainingStudent = require('./models/TrainingStudent');
const Webhook = require('./models/Webhook');
const getNewCountExtend = function(provision) {
return provision.countExtend !== undefined? (provision.countExtend + 1) : 1;
@@ -412,6 +413,7 @@ module.exports = {
trainingSession: _m(TrainingSession),
trainingTemplate: _m(TrainingTemplate),
trainingStudent: _m(TrainingStudent),
webhook: _m(Webhook),
utils: {
getNewTimeRunning: getNewTimeRunning,
getNewCountExtend: getNewCountExtend
@@ -431,7 +433,8 @@ module.exports = {
SharedProvision: SharedProvision,
TrainingSession: TrainingSession,
TrainingTemplate: TrainingTemplate,
TrainingStudent: TrainingStudent
TrainingStudent: TrainingStudent,
Webhook: Webhook
}
};

View File

@@ -6,6 +6,7 @@ const TF_DESTROY_QUEUE = 'TF_DESTROY_QUEUE';
const TF_APPLY_QSEOK_QUEUE = 'TF_APPLY_QSEOK_QUEUE';
const STOP_CONTAINER_QUEUE = 'STOP_CONTAINER_QUEUE';
const SYNAPSE_QUEUE = 'SYNAPSE_QUEUE';
const WEBHOOK_QUEUE = 'WEBHOOK_QUEUE';
var terraformApplyQueue = new Queue(TF_APPLY_QUEUE, process.env.REDIS_URL);
@@ -13,6 +14,7 @@ var terraformDestroyQueue = new Queue(TF_DESTROY_QUEUE, process.env.REDIS_URL);
var terraformApplyQseokQueue = new Queue(TF_APPLY_QSEOK_QUEUE, process.env.REDIS_URL);
var stopContainerQueue = new Queue(STOP_CONTAINER_QUEUE, process.env.REDIS_URL);
var synapseQueue = new Queue(SYNAPSE_QUEUE, process.env.REDIS_URL);
var webhookQueue = new Queue(WEBHOOK_QUEUE, process.env.REDIS_URL);
const queues = {
@@ -20,7 +22,8 @@ const queues = {
[TF_DESTROY_QUEUE]: terraformDestroyQueue,
[TF_APPLY_QSEOK_QUEUE]: terraformApplyQseokQueue,
[STOP_CONTAINER_QUEUE]: stopContainerQueue,
[SYNAPSE_QUEUE]: synapseQueue
[SYNAPSE_QUEUE]: synapseQueue,
[WEBHOOK_QUEUE]: webhookQueue
};
@@ -67,6 +70,7 @@ module.exports.TF_DESTROY_QUEUE = TF_DESTROY_QUEUE;
module.exports.TF_APPLY_QSEOK_QUEUE = TF_APPLY_QSEOK_QUEUE;
module.exports.STOP_CONTAINER_QUEUE = STOP_CONTAINER_QUEUE;
module.exports.SYNAPSE_QUEUE = SYNAPSE_QUEUE;
module.exports.WEBHOOK_QUEUE= WEBHOOK_QUEUE;
module.exports.queues = queues;

View File

@@ -110,6 +110,13 @@ module.exports = async function(job) {
sendEmail.sendProvisionError(prov, job.data._scenario);
}
queues[WEBHOOK_QUEUE].add("webhook_job", {
provId: prov._id,
scenario: prov.scenario,
user: triggerUser._id,
eventType: "provision-finished"
});
return Promise.resolve({"success": true, provMongo: prov});
} ).catch( function(err) {
console.log("ProcessorApply# Provision: error", err);
@@ -121,6 +128,14 @@ module.exports = async function(job) {
if ( errormsg !== "aborted") {
sendEmail.sendProvisionError(prov, job.data._scenario);
}
queues[WEBHOOK_QUEUE].add("webhook_job", {
provId: prov._id,
scenario: prov.scenario,
user: triggerUser._id,
eventType: "provision-failed"
});
return Promise.reject({"success": false, "error": err});
} );
}

View File

@@ -5,6 +5,7 @@ const TF_APPLY_QSEOK_QUEUE = MYQUEUES.TF_APPLY_QSEOK_QUEUE;
const TF_DESTROY_QUEUE = MYQUEUES.TF_DESTROY_QUEUE;
const STOP_CONTAINER_QUEUE = MYQUEUES.STOP_CONTAINER_QUEUE;
const SYNAPSE_QUEUE = MYQUEUES.SYNAPSE_QUEUE;
const WEBHOOK_QUEUE = MYQUEUES.WEBHOOK_QUEUE;
const queues = MYQUEUES.queues;
var path = require("path");
@@ -15,6 +16,7 @@ queues[TF_APPLY_QSEOK_QUEUE].process("tf_apply_qseok_job", 10, path.resolve(__di
queues[TF_DESTROY_QUEUE].process("tf_destroy_job", 10, path.resolve(__dirname, 'processor-destroy.js'));
queues[STOP_CONTAINER_QUEUE].process("tf_abort_apply_job", 10, path.resolve(__dirname, 'processor-stop-container.js'));
queues[SYNAPSE_QUEUE].process("synapse_job", 10, path.resolve(__dirname, 'processor-synapse.js'));
queues[WEBHOOK_QUEUE].process("webhook_job", 10, path.resolve(__dirname, 'processor-webhook.js'));
console.log(`Worker queues started!`);

View File

@@ -1,6 +1,6 @@
{
"name": "qmi-cloud-worker",
"version": "2.1.1",
"version": "2.2.0",
"scripts": {
"start": "node index.js",
"start:dev": "nodemon index.js",

View File

@@ -3,20 +3,6 @@ const commonApply = require("./common-apply");
module.exports = function(job){
return commonApply(job);
/*.then(function(res) {
const provId = res.provMongo._id.toString();
const kubeconfig = res.provMongo.outputs.kube_config;
return kubectl.apply(provId, kubeconfig)
.then(function(){
console.log("DONE KUBECTL APPLY!!!!!!!!!!!!");
return kubectl.getpod(provId, kubeconfig);
}).then(function(){
console.log("DONE KUBECTL GETPOD!!!!!!!!!!!!");
});
});*/
}

View File

@@ -0,0 +1,55 @@
const db = require('qmi-cloud-common/mongo');
const axios = require('axios');
const https = require("https");
module.exports = async function(job) {
var filter = { "eventType": job.data.eventType};
var webhooks = await db.webhook.get(filter);
if ( !webhooks || webhooks.length === 0 ) {
console.log(`ProcessorWebhook# Error: Not found Webhooks to execute for event '${job.data.eventType}'` );
return Promise.resolve({"success": false, "err": "Not found Webhooks to execute for event '${job.data.eventType}'"});
}
console.log(`ProcessorWebhook# Webhooks to execute for eventType '${webhook.eventType}' and scenario '${webhook.scenario}'` );
var provision = null;
if ( job.data.provId ) {
provision = await db.provision.getById(job.data.provId);
}
webhooks.forEach(wh => {
const url = wh.url;
const payload = {
event: wh.eventType,
scenario: wh.scenario,
data: provision
};
const headers = wh.headers;
try {
console.log(`ProcessorWebhook# POST to URL '${url}' and event'${webhook.eventType}'` );
axios({
url: url,
method: "post",
data: payload,
httpsAgent: new https.Agent({
rejectUnauthorized: false
}),
headers: headers
});
} catch (error) {
console.log(`ProcessorWebhook# ERROR!! Executing POST URL '${url}' and event'${webhook.eventType}'` );
}
});
return Promise.resolve({"success": true, "msg": "done!"});
}

View File

@@ -10,6 +10,7 @@ const TF_APPLY_QSEOK_QUEUE = MYQUEUES.TF_APPLY_QSEOK_QUEUE;
const TF_DESTROY_QUEUE = MYQUEUES.TF_DESTROY_QUEUE;
const STOP_CONTAINER_QUEUE = MYQUEUES.STOP_CONTAINER_QUEUE;
const SYNAPSE_QUEUE = MYQUEUES.SYNAPSE_QUEUE;
const WEBHOOK_QUEUE = MYQUEUES.WEBHOOK_QUEUE;
const app = express();
const routesApiScenarios = require('./routes/api-scenarios');
@@ -79,6 +80,11 @@ app.use('/arena', Arena(
name: SYNAPSE_QUEUE,
hostId: 'Worker',
redis: _getRedisConfig(process.env.REDIS_URL)
},
{
name: WEBHOOK_QUEUE,
hostId: 'Worker',
redis: _getRedisConfig(process.env.REDIS_URL)
}
]
},