Skip to content

Commit

Permalink
feat: allow doc overwrite in onDocument
Browse files Browse the repository at this point in the history
  • Loading branch information
robdasilva committed Jul 13, 2022
1 parent 282c76f commit b0d034e
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 12 deletions.
25 changes: 13 additions & 12 deletions src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ export interface BulkStats {
aborted: boolean
}

interface IndexAction {
interface IndexActionOperation {
index: T.BulkIndexOperation
}

interface CreateAction {
interface CreateActionOperation {
create: T.BulkCreateOperation
}

Expand All @@ -90,7 +90,9 @@ interface DeleteAction {
delete: T.BulkDeleteOperation
}

type UpdateAction = [UpdateActionOperation, Record<string, any>]
type CreateAction = CreateActionOperation | [CreateActionOperation, unknown]
type IndexAction = IndexActionOperation | [IndexActionOperation, unknown]
type UpdateAction = [UpdateActionOperation, T.BulkUpdateAction]
type Action = IndexAction | CreateAction | UpdateAction | DeleteAction

export interface OnDropDocument<TDocument = unknown> {
Expand Down Expand Up @@ -619,22 +621,21 @@ export default class Helpers {
for await (const chunk of datasource) {
if (shouldAbort) break
timeoutRef.refresh()
const action = onDocument(chunk)
const operation = Array.isArray(action)
? Object.keys(action[0])[0]
: Object.keys(action)[0]
const result = onDocument(chunk)
const [action, payload] = Array.isArray(result) ? result : [result, chunk]
const operation = Object.keys(action)[0]
if (operation === 'index' || operation === 'create') {
actionBody = serializer.serialize(action)
payloadBody = typeof chunk === 'string' ? chunk : serializer.serialize(chunk)
payloadBody = typeof payload === 'string'
? payload
: serializer.serialize(payload)
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
bulkBody.push(actionBody, payloadBody)
} else if (operation === 'update') {
// @ts-expect-error in case of update action is an array
actionBody = serializer.serialize(action[0])
actionBody = serializer.serialize(action)
payloadBody = typeof chunk === 'string'
? `{"doc":${chunk}}`
// @ts-expect-error in case of update action is an array
: serializer.serialize({ doc: chunk, ...action[1] })
: serializer.serialize({ doc: chunk, ...payload })
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
bulkBody.push(actionBody, payloadBody)
} else if (operation === 'delete') {
Expand Down
104 changes: 104 additions & 0 deletions test/unit/helpers/bulk.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,58 @@ test('bulk index', t => {
t.end()
})

t.test('Should use payload returned by `onDocument`', async t => {
let count = 0
const updatedAt = '1970-01-01T12:00:00.000Z'
const MockConnection = connection.buildMockConnection({
onRequest (params) {
t.equal(params.path, '/_bulk')
t.match(params.headers, {
'content-type': 'application/vnd.elasticsearch+x-ndjson; compatible-with=8',
'x-elastic-client-meta': `es=${clientVersion},js=${nodeVersion},t=${transportVersion},hc=${nodeVersion},h=bp`
})
// @ts-expect-error
const [action, payload] = params.body.split('\n')
t.same(JSON.parse(action), { index: { _index: 'test' } })
t.same(JSON.parse(payload), { ...dataset[count++], updatedAt })
return { body: { errors: false, items: [{}] } }
}
})

const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const result = await client.helpers.bulk<Document>({
datasource: dataset.slice(),
flushBytes: 1,
concurrency: 1,
onDocument (doc) {
t.type(doc.user, 'string') // testing that doc is type of Document
return [
{
index: {
_index: 'test'
}
},
{ ...doc, updatedAt }]
},
onDrop (doc) {
t.fail('This should never be called')
}
})

t.type(result.time, 'number')
t.type(result.bytes, 'number')
t.match(result, {
total: 3,
successful: 3,
retry: 0,
failed: 0,
aborted: false
})
})

t.end()
})

Expand Down Expand Up @@ -835,6 +887,58 @@ test('bulk create', t => {
aborted: false
})
})

t.test('Should use payload returned by `onDocument`', async t => {
let count = 0
const updatedAt = '1970-01-01T12:00:00.000Z'
const MockConnection = connection.buildMockConnection({
onRequest (params) {
t.equal(params.path, '/_bulk')
t.match(params.headers, { 'content-type': 'application/vnd.elasticsearch+x-ndjson; compatible-with=8' })
// @ts-expect-error
const [action, payload] = params.body.split('\n')
t.same(JSON.parse(action), { create: { _index: 'test', _id: count } })
t.same(JSON.parse(payload), { ...dataset[count++], updatedAt })
return { body: { errors: false, items: [{}] } }
}
})

const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
let id = 0
const result = await client.helpers.bulk({
datasource: dataset.slice(),
flushBytes: 1,
concurrency: 1,
onDocument (doc) {
return [
{
create: {
_index: 'test',
_id: String(id++)
}
},
{ ...doc, updatedAt }
]
},
onDrop (doc) {
t.fail('This should never be called')
}
})

t.type(result.time, 'number')
t.type(result.bytes, 'number')
t.match(result, {
total: 3,
successful: 3,
retry: 0,
failed: 0,
aborted: false
})
})

t.end()
})

Expand Down

0 comments on commit b0d034e

Please sign in to comment.