Skip to content

Commit

Permalink
Minor: fix Lineage export when there is no column / pipeline edge (#1…
Browse files Browse the repository at this point in the history
…8737)

* Minor: fix Lineage export when there is no column / pipeline edge

* add test for lineage export (#18709)

Co-authored-by: Karan Hotchandani <[email protected]>

* add lineage export tests

---------

Co-authored-by: Karan Hotchandani <[email protected]>
Co-authored-by: Sweta Agarwalla <[email protected]>
Co-authored-by: karanh37 <[email protected]>
  • Loading branch information
4 people committed Nov 24, 2024
1 parent ce4354b commit f220606
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,12 @@ public final String exportCsvAsync(
baseRow.put("toDomain", getDomainFQN(toEntity.path("domain")));

JsonNode columns = edge.path("columns");
if (columns.isArray() && !columns.isEmpty()) {
List<ColumnMapping> explicitColumnMappings = extractColumnMappingsFromEdge(columns);
for (ColumnMapping mapping : explicitColumnMappings) {
JsonNode pipeline = edge.path("pipeline");

if (columns.isArray() && columns.size() > 0) {
// Process column mappings
List<ColumnMapping> columnMappings = extractColumnMappingsFromEdge(columns);
for (ColumnMapping mapping : columnMappings) {
writeCsvRow(
csvWriter,
baseRow,
Expand All @@ -354,34 +357,14 @@ public final String exportCsvAsync(
"",
"");
LOG.debug(
"Exported explicit ColumnMapping: from='{}', to='{}'",
"Exported ColumnMapping: from='{}', to='{}'",
mapping.getFromChildFQN(),
mapping.getToChildFQN());
}
}

JsonNode pipeline = edge.path("pipeline");
if (!pipeline.isMissingNode() && !pipeline.isNull()) {
String pipelineName = getText(pipeline, "name");
String pipelineType = getText(pipeline, "serviceType");
String pipelineDescription = getText(pipeline, "description");
String pipelineOwners = getOwners(pipeline.path("owners"));
String pipelineServiceName = getText(pipeline.path("service"), "name");
String pipelineServiceType = getText(pipeline, "serviceType");
String pipelineDomain = getDomainFQN(pipeline.path("domain"));
writeCsvRow(
csvWriter,
baseRow,
"",
"",
pipelineName,
pipelineType,
pipelineDescription,
pipelineOwners,
pipelineDomain,
pipelineServiceName,
pipelineServiceType);
LOG.debug("Exported Pipeline Information: {}", pipelineName);
} else if (!pipeline.isMissingNode() && !pipeline.isNull()) {
writePipelineRow(csvWriter, baseRow, pipeline);
} else {
writeCsvRow(csvWriter, baseRow, "", "", "", "", "", "", "", "", "");
}
}
csvWriter.close();
Expand All @@ -391,6 +374,31 @@ public final String exportCsvAsync(
}
}

private void writePipelineRow(
CSVWriter csvWriter, Map<String, String> baseRow, JsonNode pipeline) {
String pipelineName = getText(pipeline, "name");
String pipelineType = getText(pipeline, "serviceType");
String pipelineDescription = getText(pipeline, "description");
String pipelineOwners = getOwners(pipeline.path("owners"));
String pipelineServiceName = getText(pipeline.path("service"), "name");
String pipelineServiceType = getText(pipeline, "serviceType");
String pipelineDomain = getDomainFQN(pipeline.path("domain"));

writeCsvRow(
csvWriter,
baseRow,
"",
"",
pipelineName,
pipelineType,
pipelineDescription,
pipelineOwners,
pipelineDomain,
pipelineServiceName,
pipelineServiceType);
LOG.debug("Exported Pipeline Information: {}", pipelineName);
}

private static void writeCsvRow(
CSVWriter csvWriter,
Map<String, String> baseRow,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import {
setupEntitiesForLineage,
verifyColumnLayerActive,
verifyColumnLayerInactive,
verifyColumnLineageInCSV,
verifyExportLineageCSV,
verifyNodePresent,
visitLineageTab,
} from '../../utils/lineage';
Expand Down Expand Up @@ -122,6 +124,13 @@ for (const EntityClass of entities) {
}
});

await test.step('Verify Lineage Export CSV', async () => {
await redirectToHomePage(page);
await currentEntity.visitEntityPage(page);
await visitLineageTab(page);
await verifyExportLineageCSV(page, currentEntity, entities, pipeline);
});

await test.step('Remove lineage between nodes for the entity', async () => {
await redirectToHomePage(page);
await currentEntity.visitEntityPage(page);
Expand Down Expand Up @@ -199,6 +208,13 @@ test('Verify column lineage between table and topic', async ({ browser }) => {

// Add column lineage
await addColumnLineage(page, sourceCol, targetCol);

// Verify column lineage
await redirectToHomePage(page);
await table.visitEntityPage(page);
await visitLineageTab(page);
await verifyColumnLineageInCSV(page, table, topic, sourceCol, targetCol);

await page.click('[data-testid="edit-lineage"]');

await removeColumnLineage(page, sourceCol, targetCol);
Expand Down Expand Up @@ -275,7 +291,6 @@ test('Verify column lineage between table and api endpoint', async ({
// Add column lineage
await addColumnLineage(page, sourceCol, targetCol);
await page.click('[data-testid="edit-lineage"]');

await removeColumnLineage(page, sourceCol, targetCol);
await page.click('[data-testid="edit-lineage"]');

Expand Down
152 changes: 152 additions & 0 deletions openmetadata-ui/src/main/resources/ui/playwright/utils/lineage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
import { expect, Page } from '@playwright/test';
import { get } from 'lodash';
import { parseCSV } from '../../src/utils/EntityImport/EntityImportUtils';
import { ApiEndpointClass } from '../support/entity/ApiEndpointClass';
import { ContainerClass } from '../support/entity/ContainerClass';
import { DashboardClass } from '../support/entity/DashboardClass';
Expand All @@ -28,6 +29,38 @@ import {
toastNotification,
} from './common';

type LineageCSVRecord = {
fromEntityFQN: string;
fromServiceName: string;
fromServiceType: string;
toEntityFQN: string;
toServiceName: string;
toServiceType: string;
pipelineName: string;
};

export const LINEAGE_CSV_HEADERS = [
'fromEntityFQN',
'fromServiceName',
'fromServiceType',
'fromOwners',
'fromDomain',
'toEntityFQN',
'toServiceName',
'toServiceType',
'toOwners',
'toDomain',
'fromChildEntityFQN',
'toChildEntityFQN',
'pipelineName',
'pipelineType',
'pipelineDescription',
'pipelineOwners',
'pipelineDomain',
'pipelineServiceName',
'pipelineServiceType',
];

export const verifyColumnLayerInactive = async (page: Page) => {
await page.click('[data-testid="lineage-layer-btn"]'); // Open Layer popover
await page.waitForSelector(
Expand Down Expand Up @@ -473,3 +506,122 @@ export const verifyColumnLayerActive = async (page: Page) => {
await page.waitForSelector('[data-testid="lineage-layer-column-btn"].active');
await page.click('[data-testid="lineage-layer-btn"]'); // Close Layer popover
};

export const verifyCSVHeaders = async (page: Page, headers: string[]) => {
LINEAGE_CSV_HEADERS.forEach((expectedHeader) => {
expect(headers).toContain(expectedHeader);
});
};

export const getLineageCSVData = async (page: Page) => {
await page.getByTestId('lineage-export').click();

await expect(page.getByRole('dialog', { name: 'Export' })).toBeVisible();

const [download] = await Promise.all([
page.waitForEvent('download'),
page.click('button#submit-button'),
]);

const filePath = await download.path();

expect(filePath).not.toBeNull();

const fileContent = await download.createReadStream();

let fileData = '';
for await (const item of fileContent) {
fileData += item.toString();
}

const csvRows = fileData
.split('\n')
.map((row) => row.split(',').map((cell) => cell.replace(/"/g, '').trim()));

const headers = csvRows[0];
await verifyCSVHeaders(page, headers);

return parseCSV(csvRows);
};

export const verifyExportLineageCSV = async (
page: Page,
currentEntity: EntityClass,
entities: readonly [
TableClass,
DashboardClass,
TopicClass,
MlModelClass,
ContainerClass,
SearchIndexClass,
ApiEndpointClass,
MetricClass
],
pipeline: PipelineClass
) => {
const parsedData = await getLineageCSVData(page);
const currentEntityFQN = get(
currentEntity,
'entityResponseData.fullyQualifiedName'
);

const arr = [];
for (let i = 0; i < entities.length; i++) {
arr.push({
fromEntityFQN: currentEntityFQN,
fromServiceName: get(
currentEntity,
'entityResponseData.service.name',
''
),
fromServiceType: get(currentEntity, 'entityResponseData.serviceType', ''),
toEntityFQN: get(
entities[i],
'entityResponseData.fullyQualifiedName',
''
),
toServiceName: get(entities[i], 'entityResponseData.service.name', ''),
toServiceType: get(entities[i], 'entityResponseData.serviceType', ''),
pipelineName: get(pipeline, 'entityResponseData.name', ''),
});
}

arr.forEach((expectedRow: LineageCSVRecord) => {
const matchingRow = parsedData.find((row) =>
Object.keys(expectedRow).every(
(key) => row[key] === expectedRow[key as keyof LineageCSVRecord]
)
);

expect(matchingRow).toBeDefined(); // Ensure a matching row exists
});
};

export const verifyColumnLineageInCSV = async (
page: Page,
sourceEntity: EntityClass,
targetEntity: EntityClass,
sourceColFqn: string,
targetColFqn: string
) => {
const parsedData = await getLineageCSVData(page);
const expectedRow = {
fromEntityFQN: get(sourceEntity, 'entityResponseData.fullyQualifiedName'),
fromServiceName: get(sourceEntity, 'entityResponseData.service.name', ''),
fromServiceType: get(sourceEntity, 'entityResponseData.serviceType', ''),
toEntityFQN: get(targetEntity, 'entityResponseData.fullyQualifiedName', ''),
toServiceName: get(targetEntity, 'entityResponseData.service.name', ''),
toServiceType: get(targetEntity, 'entityResponseData.serviceType', ''),
fromChildEntityFQN: sourceColFqn,
toChildEntityFQN: targetColFqn,
pipelineName: '',
};

const matchingRow = parsedData.find((row) =>
Object.keys(expectedRow).every(
(key) => row[key] === expectedRow[key as keyof LineageCSVRecord]
)
);

expect(matchingRow).toBeDefined(); // Ensure a matching row exists
};

0 comments on commit f220606

Please sign in to comment.