Bewirtschaftung von Data Lakesin der Praxis
Hans-Peter Zorn, Dr. Dominik Benz, inovex GmbH
data2day Karlsruhe, 05.10.2016
Ein See, ein See, ... wozu?
http://p5.focus.de/img/fotos/crop287471/1000162586-cfreecrop-w960-h541-ocx0_y63-q75-p5/seeelefant.jpghttps://c2.staticflickr.com/6/5321/9283741715_ba20166c37_b.jpg
Unterschiedliche Datenquellen,
Formate Integration, Verknüpfung
Kosteneffiziente Langzeit-
archivierung
OffloadingVorverarbeitung
DWH
Zentrale Anlaufstelle für
Daten
2
Inspiration für neue
Datenprodukte
Spielwiese für Analysten,
Exploration
3
Der Weg zum See
Definition, Herausforderungen
Werkzeuge (Airflow / Schedoscope)
Lösungsansätze
Vergleich & Empfehlungen
Typischer Aufbau
raw processed datahub analysisingress egress
scheduling, orchestration, metadata
user access, system integration,development
4
5
Herausforderungen
raw processed datahub analysisingress egress
Scheduling, orchestration, metadata
user access, system integration,development
Agile Anbindung
Modellierung Abhängigkeiten
Testing der Transformationen
Behandlung spät ankommende
Daten
› Kleinste Einheit: Artefakt von zusammengehörigen Daten („Tabelle“)
› Jedes Artefakt ist als Hive-Tabelle registriert
› Bewirtschaftung = (Neu)Erstellung der Artefakte
› Berücksichtigung von Abhängigkeiten bei der Bewirtschaftung
› Fokus: kontinuierlich neue Daten (Batches, Daten in Zeitscheiben)
6
Definition
Data Lake in unserem Kontext:
7
Der Weg zum See
Werkzeuge (Airflow / Schedoscope)
Lösungsansätze
Vergleich & Empfehlungen
› Interne Datenplattformen u.a.bei Otto Group BI
› Gestartet mit Oozie; Probleme:› Langsame Entwicklungszyklen
› Viele redundante XML-Dateien
› Unzureichende Clusterauslastung
› Fehleranfällig
› Fehlersuche schwierig
8
Ausgangslage Werkzeugeeasy statt oozie J
› „Schema is code“
› Goal-driven scheduling
› Open Source, Scala (Otto Group BI)
Airflow
› „Workflow is code“
› Dynamische Daten-Pipelines
› Open Source, Python (AirBnB)
9
s
Agile Anbindung
› Definition von „Views“ (Schema)
› Statisch typisierte Scala-DSL
› Agiles, schnelles Erstellen neuer Quellen & Ansichten
Agile Anbindung
› Abhängigkeiten Teil der View-Definition
› Integriertes Unit-Test Framework
› Entwickelt seit 2015 bei Otto Group BI
› Open Source (Apache SL 2.0) Agile Anbindung› Ziel-getriebene Beladung
(„Materialized Views on Demand“)
10
s
Agile Anbindung
case class Nodes(year: Parameter[String],month: Parameter[String]) extends Viewwith MonthlyParameterizationwith Idwith PointOccurrence {val version = fieldOf[Int]val user_id = fieldOf[Int]val longitude = fieldOf[Double]val latitude = fieldOf[Double]
comment("View of nodes partitioned by year andmonth with tags and geohash")
storedAs(Parquet())}
11
s
Agile Anbindung
case class Nodes(...
val version = fieldOf[Int]val user_id = fieldOf[Int]val longitude = fieldOf[Double]val latitude = fieldOf[Double]
transformVia(() =>HiveTransformation(insertInto(this,queryFromResource(“hql/insertnodes.hql")))
)
...}
12
Airflow
Agile Anbindung
› Schema der (Hive-)Tabellen extern verwaltet (z.B. mit db-deploy)
› Schema-Änderungen müssen manuell nachgezogen werden
Agile Anbindung
› Definition von Daten-Pipelines/DAGs mit Python
› Dynamische Spezifikation der Abhängigkeiten
› Transformationen via Operatoren
› Entwickelt bei AirBnB
› Seit Juni 2015 Open Source
Agile Anbindung› Zeitbasiertes Scheduling
› Konzept von „Sensoren“
13
Agile Anbindung
Airflow
upload_data = BashOperator( task_id='upload_data', bash_command='hadoop fs –put data.csv /data', dag=dag)
process_data = HiveOperator( task_id='process_data', sql='hive_script.sql', dag=dag)
upload_data.set_downstream(process_data)
Vergleich
Oozie Airflow Schedoscope
Sprache Java Python Scala
Workflow-Spezifikation XML Python Scala
Definition Abhängigkeiten
Statisch (innerhalb XML); Definition „globale Struktur“
Dynamisch (via Python); Definition „globale Struktur“ (= DAG)
Dynamisch; Definition„lokal“ (pro View)
Schema-Management extern extern intern (schema is code)
Change-Management Transformationen
nein nein ja
Test-Framework nein nein ja
Scheduling-Optionen data, time data (via Sensors), time Ziel / Goal
Sonstige Features Coordinators, Bundles, ... GUI für Monitoring/Config; Plugin-Erweiterbarkeit;SLA-Management, ...
GUI (Metascope);Anbindung an externe Dienste (Redis,...); ...
14
15
GUIsAirflow
(Metascope)
16
Der Weg zum See
Lösungsansätze
Vergleich & Empfehlungen
Agilität
Entwicklung:• Schema• Transformation• Workflow
Deploymentdev
Testen
Deploymentprod
Schema, SQL/Spark, Oozie-
XML
Maven, Jenkins,..FitNesse,
Explorativ, Tests mit Hive
› Viele Schrauben› Viele
Fehlerquellen
17
› Data Engineering == Softwareentwicklung› Best Practices ähnlich
18
Agilität
Wie wird man schneller?
Agile Anbindung
› Weniger unterschiedliche Tools weniger Konfigurationen an zu passen
Agile Anbindung
› Früh testen› Zeit bis zum entdeckten Fehler
reduzieren
› Continuous Integration/Delivery› Automatisierung!
19
Testing› Tippfehler (im XML, SQL)› Logische Fehler› Schema-Änderungen in
vorgelagerten Tabellen› Änderungen im Verhalten
der Plattform, ...
end-to-end (zB FitNesse)› Langsam, Tests getrennt
von Codebase› trotzdem wichtig!
isoliert („unit-test style“)› define input, run
transformation, check output
20
Testing
Schedoscope
1
1
1
3
3
2
"processed.Nodes" should "load correctly" in {new Nodes(p("2013"), p("06")) with test {basedOn(nodeTags, nodes)then()numRows shouldBe 1row(v(id) shouldBe "122318",v(occurredAt) shouldBe "2013-06-17 15:49:26Z",v(version) shouldBe 6,v(user_id) shouldBe 50299,
21
Testing
Airflow
1
1 3
Agile Anbindung
› Kein integriertes Testen› Einzelne Tasks können über CLI separat ausgeführt werden
(„airflow test“)› Keine direkte Unterstützung zur Erzeugung von
Eingabedaten, Prüfen von Assertions› Aber: Python-Bordmittel (pyunit, ...) können nahtlos
angebunden werden› Zügige manuelle Erstellung einer Testumgebung
22
Testing
Airflow - Code
1
1 3
processed_clicklog = HiveTestDataHook("src_omniture_clicklog", "version='post',report_suite='testsuite',day=20151014")
for i in range(0,10): processed_clicklog.add_row({"evar": {1:"evar_1_val", 2:"evar_2_val"}})
processed_clicklog.write()
self.run_task("omniture", "processed_to_datahub_events_pbox", "2015-10-14")
assertEquals(len(datahub_events_pbox.get_contents()), 17)
1
2
3
23
Abhängigkeiten
Wer A sagt, muss auch B sagen
Agile Anbindung
› Bereinigte Views basieren auf Rohdaten› Analytische Views <- Processed› Aggregationen und KPI› Export in Serving oder Analyische DB (Exasol, Redis)
› Joins› Aggregate über Zeiträume› Sequentielle Verarbeitung, abhängig vom Vortag
24
Abhängigkeiten
Beispiel Schedoscope
case class Nodes(year: Parameter[String],month: Parameter[String]) extends View
with JobMetadata {...dependsOn(() => NodesWithGeohash(p(year), p(month)))dependsOn(() => NodeTags(p(year), p(month)))
comment("View of nodes partitioned by year and month withtags and geohash")
storedAs(Parquet())}
Agile AnbindungDynamisch, frei definierbar
25
Abhängigkeiten
Beispiel Airflow
upload_data = BashOperator( task_id=‘upload_data', bash_command='hadoop fs –put data.csv
/data', dag=dag)
process_data = HiveOperator( task_id='process_data', sql=‘hive_script.sql', dag=dag)
upload_data.set_downstream(process_data)
Dynamische Instantiierung der Operatoren
Dynamische Abhängigkeiten
26
Late arriving data
Lieber spät als nie
5.10.2016 21:59
5.10.2016 18:00
5.10.2016 18:00
5.10.2016 18:00
5.10.2016 18:00
5.10.2016 18:00
4.10.2016 23:02
4.10.2016 22:01
Batch: 6.10.2016
> Zeitzonen> techn. Verzögerung> falsche Systemzeit
27
Late Arriving Data
5.10.2016
3.10.2016
4.10.2016
3.10.2016
5.10.2016
4.10.2016
3.10.2016
5.10.2016
INSERT OVERWRITE+ Idempotenz- Daten ggf müssen wiederholt verarbeitet werden
Ingest-Time Event-Time
Dynamic Partitions
+ Einfach+ Einmalige Verarbeitung- nicht idempotent- Abhängigkeiten
28
Der Weg zum See
Agilität, Testing, Abhängigkeiten, Late
arriving data
Vergleich & Empfehlungen
29
Zusammenfassung
Schedoscope Airflow
• Umfassendes Tool• Testframework• Schema-Management• Export-Funktionen (jdbc,
Kafka)• Metadaten: Metascope
• Leichtgewichtiger Ansatz• Workflow is code• erweiterbar• Komfortable GUIs• Community (seit April im
Apache Incubator)
• kleine Community• Lernkurve• Alles-oder-nichts
• Überschaubares Set an Standard-Operatoren
• Verteilung erfordert zusätzliche Infrastruktur
• Schema-/Change-Management aussen vorDemo gerne zwischen
den Talks!
Vielen Dank
Hans-Peter Zorn
Dr. Dominik Benz
inovex GmbH
Park Plaza
Ludwig-Erhard-Allee 6
76131 Karlsruhe