feat: scheduler + worker pool

parent 6ce29bdb
...@@ -122,3 +122,11 @@ dataPath: ./data ...@@ -122,3 +122,11 @@ dataPath: ./data
# file uploads. # file uploads.
bodyParserLimit: 5mb bodyParserLimit: 5mb
# ---------------------------------------------------------------------
# Workers Limit
# ---------------------------------------------------------------------
# Maximum number of workers that can run background cpu-intensive jobs.
# Leave to 'auto' to use CPU cores count as maximum.
workers: auto
...@@ -88,7 +88,6 @@ ...@@ -88,7 +88,6 @@
"he": "1.2.0", "he": "1.2.0",
"highlight.js": "10.3.1", "highlight.js": "10.3.1",
"i18next": "19.8.3", "i18next": "19.8.3",
"i18next-express-middleware": "2.0.0",
"i18next-node-fs-backend": "2.1.3", "i18next-node-fs-backend": "2.1.3",
"image-size": "0.9.2", "image-size": "0.9.2",
"js-base64": "3.7.2", "js-base64": "3.7.2",
...@@ -153,6 +152,7 @@ ...@@ -153,6 +152,7 @@
"pg-pubsub": "0.8.0", "pg-pubsub": "0.8.0",
"pg-query-stream": "4.2.4", "pg-query-stream": "4.2.4",
"pg-tsquery": "8.4.0", "pg-tsquery": "8.4.0",
"poolifier": "2.2.0",
"pug": "3.0.2", "pug": "3.0.2",
"punycode": "2.1.1", "punycode": "2.1.1",
"puppeteer-core": "17.1.3", "puppeteer-core": "17.1.3",
......
...@@ -27,34 +27,16 @@ defaults: ...@@ -27,34 +27,16 @@ defaults:
logLevel: info logLevel: info
logFormat: default logFormat: default
offline: false offline: false
dataPath: ./data
bodyParserLimit: 5mb bodyParserLimit: 5mb
workers: auto
# DB defaults # DB defaults
api: api:
isEnabled: false isEnabled: false
graphEndpoint: 'https://graph.requarks.io'
lang:
code: en
autoUpdate: true
namespaces: []
namespacing: false
rtl: false
telemetry:
clientId: ''
isEnabled: false
title: Wiki.js
company: ''
contentLicense: ''
logoUrl: https://static.requarks.io/logo/wikijs-butterfly.svg
mail: mail:
host: '' host: ''
secure: true secure: true
verifySSL: true verifySSL: true
nav:
mode: 'MIXED'
theming:
theme: 'default'
iconset: 'md'
darkMode: false
auth: auth:
autoLogin: false autoLogin: false
enforce2FA: false enforce2FA: false
...@@ -63,34 +45,30 @@ defaults: ...@@ -63,34 +45,30 @@ defaults:
audience: 'urn:wiki.js' audience: 'urn:wiki.js'
tokenExpiration: '30m' tokenExpiration: '30m'
tokenRenewal: '14d' tokenRenewal: '14d'
features:
featurePageRatings: true
featurePageComments: true
featurePersonalWikis: true
security: security:
securityOpenRedirect: true corsMode: 'OFF'
securityIframe: true corsConfig: ''
securityReferrerPolicy: true enforceCsp: false
securityTrustProxy: true trustProxy: false
securitySRI: true enforceHsts: false
securityHSTS: false disallowFloc: true
securityHSTSDuration: 300 hstsDuration: 0
securityCSP: false cspDirectives: ''
securityCSPDirectives: '' uploadScanSVG: true
server: disallowIframe: true
sslRedir: false uploadMaxFiles: 20
uploads: authJwtAudience: 'urn:wiki.js'
maxFileSize: 5242880 authJwtExpiration: '30m'
maxFiles: 10 uploadMaxFileSize: 10485760
scanSVG: true forceAssetDownload: true
forceDownload: true disallowOpenRedirect: true
authJwtRenewablePeriod: '14d'
enforceSameOriginReferrerPolicy: true
flags: flags:
ldapdebug: false ldapdebug: false
sqllog: false sqllog: false
# System defaults # System defaults
channel: NEXT channel: NEXT
setup: false
dataPath: ./data
cors: cors:
credentials: true credentials: true
maxAge: 600 maxAge: 600
...@@ -99,10 +77,6 @@ defaults: ...@@ -99,10 +77,6 @@ defaults:
search: search:
maxHits: 100 maxHits: 100
maintainerEmail: security@requarks.io maintainerEmail: security@requarks.io
localeNamespaces:
- admin
- auth
- common
jobs: jobs:
purgeUploads: purgeUploads:
onInit: true onInit: true
......
const _ = require('lodash') const _ = require('lodash')
const EventEmitter = require('eventemitter2').EventEmitter2 const EventEmitter = require('eventemitter2').EventEmitter2
let isShuttingDown = false
/* global WIKI */ /* global WIKI */
module.exports = { module.exports = {
...@@ -31,9 +33,8 @@ module.exports = { ...@@ -31,9 +33,8 @@ module.exports = {
*/ */
async preBootWeb() { async preBootWeb() {
try { try {
WIKI.sideloader = await require('./sideloader').init()
WIKI.cache = require('./cache').init() WIKI.cache = require('./cache').init()
WIKI.scheduler = require('./scheduler').init() WIKI.scheduler = await require('./scheduler').init()
WIKI.servers = require('./servers') WIKI.servers = require('./servers')
WIKI.events = { WIKI.events = {
inbound: new EventEmitter(), inbound: new EventEmitter(),
...@@ -83,6 +84,8 @@ module.exports = { ...@@ -83,6 +84,8 @@ module.exports = {
* Graceful shutdown * Graceful shutdown
*/ */
async shutdown (devMode = false) { async shutdown (devMode = false) {
if (isShuttingDown) { return }
isShuttingDown = true
if (WIKI.servers) { if (WIKI.servers) {
await WIKI.servers.stopServers() await WIKI.servers.stopServers()
} }
...@@ -99,6 +102,7 @@ module.exports = { ...@@ -99,6 +102,7 @@ module.exports = {
await WIKI.asar.unload() await WIKI.asar.unload()
} }
if (!devMode) { if (!devMode) {
WIKI.logger.info('Terminating process...')
process.exit(0) process.exit(0)
} }
} }
......
const _ = require('lodash')
const dotize = require('dotize')
const i18nMW = require('i18next-express-middleware')
const i18next = require('i18next')
const Promise = require('bluebird')
const fs = require('fs-extra')
const path = require('path')
const yaml = require('js-yaml')
/* global WIKI */
module.exports = {
engine: null,
namespaces: [],
init() {
this.namespaces = WIKI.data.localeNamespaces
this.engine = i18next
this.engine.init({
load: 'languageOnly',
ns: this.namespaces,
defaultNS: 'common',
saveMissing: false,
lng: WIKI.config.lang.code,
fallbackLng: 'en'
})
// Load current language + namespaces
this.refreshNamespaces(true)
return this
},
/**
* Attach i18n middleware for Express
*
* @param {Object} app Express Instance
*/
attachMiddleware (app) {
app.use(i18nMW.handle(this.engine))
},
/**
* Get all entries for a specific locale and namespace
*
* @param {String} locale Locale code
* @param {String} namespace Namespace
*/
async getByNamespace(locale, namespace) {
if (this.engine.hasResourceBundle(locale, namespace)) {
let data = this.engine.getResourceBundle(locale, namespace)
return _.map(dotize.convert(data), (value, key) => {
return {
key,
value
}
})
} else {
throw new Error('Invalid locale or namespace')
}
},
/**
* Load entries from the DB for a single locale
*
* @param {String} locale Locale code
* @param {*} opts Additional options
*/
async loadLocale(locale, opts = { silent: false }) {
const res = await WIKI.models.locales.query().findOne('code', locale)
if (res) {
if (_.isPlainObject(res.strings)) {
_.forOwn(res.strings, (data, ns) => {
this.namespaces.push(ns)
this.engine.addResourceBundle(locale, ns, data, true, true)
})
}
} else if (!opts.silent) {
throw new Error('No such locale in local store.')
}
// -> Load dev locale files if present
if (WIKI.IS_DEBUG) {
try {
const devEntriesRaw = await fs.readFile(path.join(WIKI.SERVERPATH, `locales/${locale}.yml`), 'utf8')
if (devEntriesRaw) {
const devEntries = yaml.safeLoad(devEntriesRaw)
_.forOwn(devEntries, (data, ns) => {
this.namespaces.push(ns)
this.engine.addResourceBundle(locale, ns, data, true, true)
})
WIKI.logger.info(`Loaded dev locales from ${locale}.yml`)
}
} catch (err) {
// ignore
}
}
},
/**
* Reload all namespaces for all active locales from the DB
*
* @param {Boolean} silent No error on fail
*/
async refreshNamespaces (silent = false) {
await this.loadLocale(WIKI.config.lang.code, { silent })
if (WIKI.config.lang.namespacing) {
for (let ns of WIKI.config.lang.namespaces) {
await this.loadLocale(ns, { silent })
}
}
},
/**
* Set the active locale
*
* @param {String} locale Locale code
*/
async setCurrentLocale(locale) {
await Promise.fromCallback(cb => {
return this.engine.changeLanguage(locale, cb)
})
}
}
const PgBoss = require('pg-boss') const PgBoss = require('pg-boss')
const { DynamicThreadPool } = require('poolifier')
const os = require('node:os')
/* global WIKI */ /* global WIKI */
module.exports = { module.exports = {
pool: null,
scheduler: null, scheduler: null,
jobs: [], async init () {
init () {
WIKI.logger.info('Initializing Scheduler...') WIKI.logger.info('Initializing Scheduler...')
this.scheduler = new PgBoss({ this.scheduler = new PgBoss({
...WIKI.models.knex.client.connectionSettings, db: {
close: () => Promise.resolve('ok'),
executeSql: async (text, values) => {
try {
const resource = await WIKI.models.knex.client.pool.acquire().promise
const res = await resource.query(text, values)
WIKI.models.knex.client.pool.release(resource)
return res
} catch (err) {
WIKI.logger.error('Failed to acquire DB connection during scheduler query execution.')
WIKI.logger.error(err)
}
}
},
// ...WIKI.models.knex.client.connectionSettings,
application_name: 'Wiki.js Scheduler', application_name: 'Wiki.js Scheduler',
schema: WIKI.config.db.schemas.scheduler, schema: WIKI.config.db.schemas.scheduler,
uuid: 'v4' uuid: 'v4'
}) })
const maxWorkers = WIKI.config.workers === 'auto' ? os.cpus().length : WIKI.config.workers
WIKI.logger.info(`Initializing Worker Pool (Max ${maxWorkers})...`)
this.pool = new DynamicThreadPool(1, maxWorkers, './server/worker.js', {
errorHandler: (err) => WIKI.logger.warn(err),
exitHandler: () => WIKI.logger.debug('A worker has gone offline.'),
onlineHandler: () => WIKI.logger.debug('New worker is online.')
})
return this return this
}, },
async start () { async start () {
WIKI.logger.info('Starting Scheduler...') WIKI.logger.info('Starting Scheduler...')
await this.scheduler.start() await this.scheduler.start()
this.scheduler.work('*', async job => {
return this.pool.execute({
id: job.id,
name: job.name,
data: job.data
})
})
WIKI.logger.info('Scheduler: [ STARTED ]') WIKI.logger.info('Scheduler: [ STARTED ]')
}, },
async stop () { async stop () {
WIKI.logger.info('Stopping Scheduler...') WIKI.logger.info('Stopping Scheduler...')
await this.scheduler.stop() await this.scheduler.stop()
await this.pool.destroy()
WIKI.logger.info('Scheduler: [ STOPPED ]') WIKI.logger.info('Scheduler: [ STOPPED ]')
} }
} }
...@@ -18,7 +18,6 @@ module.exports = async () => { ...@@ -18,7 +18,6 @@ module.exports = async () => {
// ---------------------------------------- // ----------------------------------------
WIKI.auth = require('./core/auth').init() WIKI.auth = require('./core/auth').init()
WIKI.lang = require('./core/localization').init()
WIKI.mail = require('./core/mail').init() WIKI.mail = require('./core/mail').init()
WIKI.system = require('./core/system').init() WIKI.system = require('./core/system').init()
...@@ -134,12 +133,6 @@ module.exports = async () => { ...@@ -134,12 +133,6 @@ module.exports = async () => {
app.use(bodyParser.urlencoded({ extended: false, limit: '1mb' })) app.use(bodyParser.urlencoded({ extended: false, limit: '1mb' }))
// ---------------------------------------- // ----------------------------------------
// Localization
// ----------------------------------------
WIKI.lang.attachMiddleware(app)
// ----------------------------------------
// View accessible data // View accessible data
// ---------------------------------------- // ----------------------------------------
......
const { ThreadWorker } = require('poolifier')
module.exports = new ThreadWorker(async (job) => {
return { ok: true }
}, { async: true })
This diff was suppressed by a .gitattributes entry.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment