Neo4j 从数据库 和 CSV 中导入数据
一、示例数据库
NorthWind 下载地址 Neo4j 数据集案例中的 NorthWind 数据库文件。
选择 Postgresql 版本的数据:northwind.postgre.sql
数据库结构:
主要用到表 customers、suppliers、products、employees 和 categories。
二、建立图模型
从关系模型中导出图模型时,我们应牢记以下准则:
- A row is a node
- A table name is a label name
在此数据集中,以下图形模型用作第一次迭代:
图模型与关系模型有何不同?
- 没有null
- 在关系版本中,为了跟踪员工层次结构,如果他们不向任何人报告,我们在“ReportsTo”列中有一个空条目。 在图形版本中,我们只是没有定义关系。
- 现有值条目(属性)不存在。
- 它更详细地描述了这种关系。 例如,我们知道员工销售订单而不是在订单和员工表之间建立外键关系。 我们还可以根据需要选择添加关于该关系的更多元数据。
- 它通常会更加规范化。 例如,地址已经在几个表中被非规范化,但是在图模型的未来版本中,我们可能选择以自己的权限创建地址节点。
三、导入 Postgresql 数据库
为了可以远程访问,方便操作数据,和可以导入SQL脚本,需要修改配置文件。
修改后重启 systemctl restart postgresql.service
。
把 sql 脚本放到 /root/Documents 下:
对于postgresql导入sql文件的简单命令为:
psql -U username -W -d schemaname -f file.sql
四、导出 CSV 文件
通过定义的图模型,我们一共用到5张表,分别为 employees、orders、products、suppliers 和 categories。 把这几张表导出为CSV文件,注意,CSV 文件要带字段名称头的行,第一行为字段列。
现在我们知道了我们想要的图形,我们需要从 PostgreSQL 中提取数据,以便我们可以将其创建为图形。 最简单的方法是以 CSV 格式导出相应的表格。 PostgreSQL Copy
命令允许我们执行 SQL 查询并将结果写入 CSV 文件,例如 使用 psql -d northwind < export_csv.sql
:
export_csv.sql文件:
COPY (SELECT * FROM customers) TO '/tmp/customers.csv' WITH CSV header;
COPY (SELECT * FROM suppliers) TO '/tmp/suppliers.csv' WITH CSV header;
COPY (SELECT * FROM products) TO '/tmp/products.csv' WITH CSV header;
COPY (SELECT * FROM employees) TO '/tmp/employees.csv' WITH CSV header;
COPY (SELECT * FROM categories) TO '/tmp/categories.csv' WITH CSV header;
COPY (SELECT * FROM orders
LEFT OUTER JOIN order_details ON order_details.OrderID = orders.OrderID) TO '/tmp/orders.csv' WITH CSV header;
五、使用 Cypher 导入 CSV 文件到 Neo4J
在我们从 Postgresql 导出数据后,我们将使用 Cypher 的 LOAD CSV
命令将 CSV 文件的内容转换为图形结构。
高能预警:默认情况下,neo4j 需要将待导入的 csv 文件放置在安装目录下的 import
文件夹下,使用 file:///
形式的协议去访问待导入文件。
如果嫌弃麻烦,也可以更改配置项 dbms.directories.import
值为磁盘文件夹绝对路径,便可以通过此文件夹路径来访问。
具体可参考 neo4j load csv 使用注意项指南。
第一步:创建节点
import_csv.cypher 文件:
// Create customers
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///customers.csv" AS row
CREATE (:Customer {companyName: row.CompanyName, customerID: row.CustomerID, fax: row.Fax, phone: row.Phone});
// Create products
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///products.csv" AS row
CREATE (:Product {productName: row.ProductName, productID: row.ProductID, unitPrice: toFloat(row.UnitPrice)});
// Create suppliers
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///suppliers.csv" AS row
CREATE (:Supplier {companyName: row.CompanyName, supplierID: row.SupplierID});
// Create employees
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///employees.csv" AS row
CREATE (:Employee {employeeID:row.EmployeeID, firstName: row.FirstName, lastName: row.LastName, title: row.Title});
// Create categories
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///categories.csv" AS row
CREATE (:Category {categoryID: row.CategoryID, categoryName: row.CategoryName, description: row.Description});
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///orders.csv" AS row
MERGE (order:Order {orderID: row.OrderID}) ON CREATE SET order.shipName = row.ShipName;
接下来,我们将在刚刚创建的节点上创建索引,以确保在下一步创建关系时快速查找。
CREATE INDEX ON :Product(productID);
CREATE INDEX ON :Product(productName);
CREATE INDEX ON :Category(categoryID);
CREATE INDEX ON :Employee(employeeID);
CREATE INDEX ON :Supplier(supplierID);
CREATE INDEX ON :Customer(customerID);
CREATE INDEX ON :Customer(customerName);
CREATE CONSTRAINT ON (o:Order) ASSERT o.orderID IS UNIQUE;
由于索引是在插入节点之后创建的,因此它们的数量是异步发生的,因此我们使用 schema await(shell命令)来阻塞它们,直到它们被填充为止。
schema await
初始节点和索引到位后,我们现在可以创建订单与产品和员工的关系。
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///orders.csv" AS row
MATCH (order:Order {orderID: row.OrderID})
MATCH (product:Product {productID: row.ProductID})
MERGE (order)-[pu:PRODUCT]->(product)
ON CREATE SET pu.unitPrice = toFloat(row.UnitPrice), pu.quantity = toFloat(row.Quantity);
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///orders.csv" AS row
MATCH (order:Order {orderID: row.OrderID})
MATCH (employee:Employee {employeeID: row.EmployeeID})
MERGE (employee)-[:SOLD]->(order);
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///orders.csv" AS row
MATCH (order:Order {orderID: row.OrderID})
MATCH (customer:Customer {customerID: row.CustomerID})
MERGE (customer)-[:PURCHASED]->(order);
接下来,创建产品,供应商和类别之间的关系:
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///products.csv" AS row
MATCH (product:Product {productID: row.ProductID})
MATCH (supplier:Supplier {supplierID: row.SupplierID})
MERGE (supplier)-[:SUPPLIES]->(product);
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///products.csv" AS row
MATCH (product:Product {productID: row.ProductID})
MATCH (category:Category {categoryID: row.CategoryID})
MERGE (product)-[:PART_OF]->(category);
最后,我们将在员工之间创建 REPORTS_TO
关系来表示报告结构:
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///employees.csv" AS row
MATCH (employee:Employee {employeeID: row.EmployeeID})
MATCH (manager:Employee {employeeID: row.ReportsTo})
MERGE (employee)-[:REPORTS_TO]->(manager);
也可以使用一次运行整个脚本 bin/neo4j-shell -path northwind.db -file import_csv.cypher
。
生成的图形应如下所示:
我们现在可以查询结果图。
六、查询图
Which Employee had the Highest Cross-Selling Count of 'Chocolade' and Which Product?
MATCH (choc:Product {productName:'Chocolade'})<-[:PRODUCT]-(:Order)<-[:SOLD]-(employee),
(employee)-[:SOLD]->(o2)-[:PRODUCT]->(other:Product)
RETURN employee.employeeID, other.productName, count(distinct o2) as count
ORDER BY count DESC
LIMIT 5;
看起来1号员工很忙!
How are Employees Organized? Who Reports to Whom?
MATCH path = (e:Employee)<-[:REPORTS_TO]-(sub)
RETURN e.employeeID AS manager, sub.employeeID AS employee;
请注意,5号员工有人向他们报告,但也向2号员工报告。
Which Employees Report to Each Other Indirectly?
MATCH path = (e:Employee)<-[:REPORTS_TO*]-(sub)
WITH e, sub, [person in NODES(path) | person.employeeID][1..-1] AS path
RETURN e.employeeID AS manager, sub.employeeID AS employee, CASE WHEN LENGTH(path) = 0 THEN "Direct Report" ELSE path END AS via
ORDER BY LENGTH(path);
How Many Orders were Made by Each Part of the Hierarchy?
MATCH (e:Employee)
OPTIONAL MATCH (e)<-[:REPORTS_TO*0..]-(sub)-[:SOLD]->(order)
RETURN e.employeeID, [x IN COLLECT(DISTINCT sub.employeeID) WHERE x <> e.employeeID] AS reports, COUNT(distinct order) AS totalOrders
ORDER BY totalOrders DESC;
七、更新图
现在,如果我们想要更新图形数据,我们必须首先找到相关信息,然后更新或扩展图形结构。
Janet is now reporting to Steven
我们需要首先找到 Steven,以及 Janet 和她的 REPORTS_TO 关系。 然后我们删除现有关系并为 Steven 创建一个新关系。
MATCH (mgr:Employee {EmployeeID:5})
MATCH (emp:Employee {EmployeeID:3})-[rel:REPORTS_TO]->()
DELETE rel
CREATE (emp)-[:REPORTS_TO]->(mgr)
RETURN *;
只需更新组织层次结构的一部分,即可进行单一关系更改。 所有后续查询将立即使用新结构。
八、拓展阅读
From SQL to Cypher – A hands-on Guide
https://neo4j.com/developer/guide-sql-to-cypher/
importing CSV Data into Neo4j
https://neo4j.com/developer/guide-import-csv/
Tool: SQL to Neo4j Import
https://neo4j.com/blog/loading-sql-neo4j-like-magic/
九、API 导入简单示例
@Test
void testInsert()
{
Driver driver = GraphDatabase.driver("bolt://192.190.10.170:7687", AuthTokens.basic("neo4j", "neo4j"));
try (Session session = driver.session())
{
int index = 1;
while (index < 100000000)
{
try
{
String cql = "CREATE (book:Book {page:$page, title:$title, author:$author})";
session.run(cql, parameters("page", index, "title", "Learning " + index, "author", "Liu " + index));
if (index % 10000 == 0)
{
System.out.println("done book " + index);
}
index++;
}
catch (Exception e)
{
System.out.println("error happened at index " + index);
}
}
}
}
这种方式造数据比较方便!