1
0
mirror of synced 2025-12-23 21:07:12 -05:00

Retry improvements on bulk indexing (#45059)

Co-authored-by: Robert Sese <734194+rsese@users.noreply.github.com>
This commit is contained in:
Peter Bengtsson
2023-10-26 15:19:04 -04:00
committed by GitHub
parent fa4ca3e088
commit 0460d9fe1b
3 changed files with 153 additions and 43 deletions

View File

@@ -107,7 +107,7 @@ jobs:
# the whole job fast.
# As of June 2023, it takes about 10+ minutes to index one whole
# language and we have 8 non-English languages.
max-parallel: 2
max-parallel: 3
matrix:
language: ${{ fromJSON(needs.figureOutMatrix.outputs.matrix) }}
steps:
@@ -197,7 +197,8 @@ jobs:
run: |
./src/search/scripts/index-elasticsearch.js /tmp/records \
--language ${{ matrix.language }} \
--stagger-seconds 5
--stagger-seconds 5 \
--retries 5
- name: Check created indexes and aliases
run: |

View File

@@ -17,6 +17,9 @@
// }
// const ok = await retry(errorTest, mainFunction, config)
//
// Note that, by default, the sleep time is "exponential" by a factor of
// 1.5. So the first sleep will, in the above example,
// be 800ms. Then 1,200ms, Then 1,800ms. etc.
//
// [end-readme]
@@ -25,19 +28,49 @@ const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms))
export async function retryOnErrorTest(
errorTest,
callback,
{ attempts = 10, sleepTime = 1000, onError = () => {} } = {},
{
attempts = 4,
sleepTime = 1000,
exponential = 1.5,
jitterPercent = 25,
onError = () => {},
} = {},
) {
while (--attempts) {
while (true) {
try {
return await callback()
} catch (error) {
if (errorTest(error)) {
// console.warn('Sleeping on', error.message, { attempts })
if (onError) onError(error, attempts)
await sleep(sleepTime)
if (error instanceof Error && attempts > 0 && errorTest(error)) {
if (onError) onError(error, attempts, sleepTime)
attempts--
// The reason for the jitter is to avoid a thundering herd problem.
// Suppose two independent processes/threads start at the same time.
// They both fail, perhaps due to rate limiting. Now, if they both
// sleep for 30 seconds in the first retry attempt, it'll just
// clash again 30 seconds later. But if you add a bit of jitter, at
// the next attempt these independent processes/threads will now
// start at slightly different times.
// According to the Oxford English dictionary, they define "jitter" as:
//
// slight irregular movement, variation, or unsteadiness,
// especially in an electrical signal or electronic device.
//
await sleep(addJitter(sleepTime, jitterPercent))
if (exponential) {
sleepTime *= 2
}
} else {
throw error
}
}
}
}
function addJitter(num, percent) {
// Return the number plus between 0 and $percent of that number.
// For example, for 1,000 with a 20% jitter you might get 1133.4
// because you start with 1,000 and 13.4% is a random number between
// 0 and 20%.
return num + Math.random() * percent * 0.01 * num
}

View File

@@ -18,7 +18,6 @@ import dotenv from 'dotenv'
import { retryOnErrorTest } from '../../../script/helpers/retry-on-error-test.js'
import { languageKeys } from '#src/languages/lib/languages.js'
import { allVersions } from '#src/versions/lib/all-versions.js'
import statsd from '#src/observability/lib/statsd.js'
// Now you can optionally have set the ELASTICSEARCH_URL in your .env file.
dotenv.config()
@@ -49,6 +48,8 @@ const shortNames = Object.fromEntries(
const allVersionKeys = Object.keys(shortNames)
const DEFAULT_SLEEPTIME_SECONDS = 30
program
.description('Creates Elasticsearch index from records')
.option('-v, --verbose', 'Verbose outputs')
@@ -72,6 +73,28 @@ program
return parsed
},
)
.option(
'-r, --retries <count>',
'Number of retry attempts on recoverable network errors',
(value) => {
const parsed = parseInt(value, 10)
if (isNaN(parsed)) {
throw new InvalidArgumentError('Not a number.')
}
return parsed
},
)
.option(
'--sleep-time <seconds>',
`Number of seconds to sleep between each retry attempt (defaults to ${DEFAULT_SLEEPTIME_SECONDS})`,
(value) => {
const parsed = parseInt(value, 10)
if (isNaN(parsed)) {
throw new InvalidArgumentError('Not a number.')
}
return parsed
},
)
.argument('<source-directory>', 'where the indexable files are')
.parse(process.argv)
@@ -195,21 +218,27 @@ async function indexAll(node, sourceDirectory, opts) {
const prefix = indexPrefix ? `${indexPrefix}_` : ''
for (const language of languages) {
let count = 0
for (const versionKey of versionKeys) {
console.log(chalk.yellow(`Indexing ${chalk.bold(versionKey)} in ${chalk.bold(language)}`))
const indexName = `${prefix}github-docs-${versionKey}-${language}`
console.time(`Indexing ${indexName}`)
await indexVersion(client, indexName, versionKey, language, sourceDirectory, verbose)
console.timeEnd(`Indexing ${indexName}`)
const t0 = new Date()
await indexVersion(client, indexName, versionKey, language, sourceDirectory, opts)
const t1 = new Date()
console.log(chalk.green(`Finished indexing ${indexName}. Took ${formatTime(t1 - t0)}`))
if (verbose) {
console.log(`To view index: ${safeUrlDisplay(node + `/${indexName}`)}`)
console.log(`To search index: ${safeUrlDisplay(node + `/${indexName}/_search`)}`)
}
if (staggerSeconds) {
count++
// console.log({ count, versionKeysLength: versionKeys.length })
if (staggerSeconds && count < versionKeys.length - 1) {
console.log(`Sleeping for ${staggerSeconds} seconds...`)
await sleep(1000 * staggerSeconds)
}
// A bit of visual separation betweeen each version
console.log('')
}
}
}
@@ -247,21 +276,16 @@ function utcTimestamp() {
}
// Consider moving this to lib
async function indexVersion(
client,
indexName,
version,
language,
sourceDirectory,
verbose = false,
) {
async function indexVersion(client, indexName, version, language, sourceDirectory, opts) {
const { verbose } = opts
// Note, it's a bit "weird" that numbered releases versions are
// called the number but that's the convention the previous
// search backend used
const indexVersion = shortNames[version].hasNumberedReleases
const indexVersionName = shortNames[version].hasNumberedReleases
? shortNames[version].currentRelease
: shortNames[version].miscBaseName
const recordsName = `github-docs-${indexVersion}-${language}`
const recordsName = `github-docs-${indexVersionName}-${language}`
const records = await loadRecords(recordsName, sourceDirectory)
@@ -354,15 +378,6 @@ async function indexVersion(
return [{ index: { _index: thisAlias } }, record]
})
// It's important to use `client.bulk.bind(client)` here because
// `client.bulk` is a meta-function that is attached to the Client
// class. Internally, it depends on `this.` even though it's a
// free-standing function. So if called indirectly by the `statsd.asyncTimer`
// the `this` becomes undefined.
const timed = statsd.asyncTimer(client.bulk.bind(client), 'search.bulk_index', [
`version:${version}`,
`language:${language}`,
])
const bulkOptions = {
// Default is 'false'.
// It means that the index is NOT refreshed as documents are inserted.
@@ -373,7 +388,36 @@ async function indexVersion(
// by a bot on a schedeule (GitHub Actions).
timeout: '5m',
}
const bulkResponse = await timed({ operations, ...bulkOptions })
const attempts = opts.retries || 0
const sleepTime = (opts.sleepTime || DEFAULT_SLEEPTIME_SECONDS) * 1000
console.log(`About to bulk index ${allRecords.length.toLocaleString()} records with retry %O`, {
attempts,
sleepTime,
})
const t0 = new Date()
const bulkResponse = await retryOnErrorTest(
(error) => {
// Rate limiting can happen when you're indexing too much at
// same time.
return error instanceof errors.ResponseError && error.meta.statusCode === 429
},
() => client.bulk({ operations, ...bulkOptions }),
{
attempts,
sleepTime,
onError: (_, attempts, sleepTime) => {
console.warn(
chalk.yellow(
`Failed to bulk index ${indexName}. Will attempt ${attempts} more times (after ${
sleepTime / 1000
}s sleep).`,
),
)
},
},
)
if (bulkResponse.errors) {
// Some day, when we're more confident how and why this might happen
@@ -384,9 +428,28 @@ async function indexVersion(
console.error(`Bulk response errors: ${bulkResponse.errors}`)
throw new Error('Bulk errors happened.')
}
const t1 = new Date()
console.log(`Bulk indexed ${thisAlias}. Took ${formatTime(t1 - t0)}`)
// The counting of documents in the index is async and can take a while
// to reflect. So send count requests until we get the right number.
let documentsInIndex = 0
let countAttempts = 3
while (documentsInIndex < allRecords.length) {
const { count } = await client.count({ index: thisAlias })
console.log(`Documents now in ${chalk.bold(thisAlias)}: ${chalk.bold(count.toLocaleString())}`)
documentsInIndex = count
if (documentsInIndex >= allRecords.length) break
countAttempts--
if (!countAttempts) {
console.log(`After ${countAttempts} attempts still haven't matched the expected number.`)
break
}
await sleep(1000)
}
console.log(
`Documents now in ${chalk.bold(thisAlias)}: ${chalk.bold(documentsInIndex.toLocaleString())}`,
)
// To perform an atomic operation that creates the new alias and removes
// the old indexes, we can use the updateAliases API with a body that
@@ -404,21 +467,23 @@ async function indexVersion(
]
console.log(`Alias ${indexName} -> ${thisAlias}`)
// const indices = await client.cat.indices({ format: 'json' })
console.log('About to get indices with retry %O', { attempts, sleepTime })
const indices = await retryOnErrorTest(
(error) => {
return error instanceof errors.ResponseError && error.statusCode === 404
// 404 can happen when you're trying to get an index that
// doesn't exist. ...yet!
return error instanceof errors.ResponseError && error.meta.statusCode === 404
},
() => client.cat.indices({ format: 'json' }),
{
// Combined, this is a total of 30 seconds which is not long
// for an Action that runs based on automation.
attempts: 10,
sleepTime: 3000,
onError: (error, attempts) => {
attempts,
sleepTime,
onError: (error, attempts, sleepTime) => {
console.warn(
chalk.yellow(
`Failed to get a list of indexes for '${indexName}' (${error.message}). Will attempt ${attempts} more times.`,
`Failed to get index ${indexName} (${
error.message || error.toString()
}). Will attempt ${attempts} more times (after ${formatTime(sleepTime)}s sleep).`,
),
)
},
@@ -458,3 +523,14 @@ function getSnowballLanguage(language) {
pt: 'Portuguese',
}[language]
}
function formatTime(ms) {
if (ms < 1000) {
return `${ms.toFixed(1)}ms`
}
const seconds = ms / 1000
if (seconds > 60) {
return `${Math.round(seconds / 60)}m${Math.round(seconds % 60)}s`
}
return `${seconds.toFixed(1)}s`
}