Platform Engineering Team/Event Platform Value Stream/PoC Mediawiki Stream Enrichment
This page describes an implementation of T307959.
1. Code: https://gitlab.wikimedia.org/repos/data-engineering/mediawiki-stream-enrichment
2. Package: https://gitlab.wikimedia.org/repos/data-engineering/mediawiki-stream-enrichment/-/packages/234
3. Status: PoC
4. Deployment environment: YARN https://phabricator.wikimedia.org/T323914
Mediawiki Stream Enrichment
editA proof of concept Flink Service that consumes page_change events and produces wikitext enriched events in page_content_change.
- Source schema: https://schema.wikimedia.org/repositories//primary/jsonschema/development/mediawiki/page/change/1.1.0.yaml
- Destination schema: https://schema.wikimedia.org/repositories//primary/jsonschema/development/mediawiki/page/change/1.1.0.yaml
Consume enriched events.
editEneriched events are produced into eqiad.rc0.mediawiki.page_content_change brokered in Kafka Jumbo,
Example:
kafkacat -C -b kafka-jumbo1001.eqiad.wmnet:9092 -t eqiad.rc0.mediawiki.page_content_change
Flink on YARN
editThe job has been tested on Flink 1.15.
tl;dr: the steps below are more or less automated by this unsupported script: https://gitlab.wikimedia.org/-/snippets/41 .
A standalone cluster can be setup locally (on a stat machine atop YARN) with
wget <nowiki>https://dlcdn.apache.org/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz</nowiki>
tar xvzf flink-1.15.0-bin-scala_2.12.tgz
cd flink-1.15.0
export HADOOP_CLASSPATH=`hadoop classpath`
./bin/yarn-session.sh --detached
The package target can be manually copied to a stat machine with:
scp target/enrichment-1.0-SNAPSHOT-jar-with-dependencies.jar stat1005.eqiad.wmnet:~/flink-1.15.0
Start a Flink cluster on YARN with
export HADOOP_CLASSPATH=`hadoop classpath`
./bin/yarn-session.sh --detached
From yarn.wikimedia.org you can access the Flink dashboard. This will allow monitoring job execution (Task Manager panel), and eventually stopping the job.
Job lifecycle management
editLaunch the job
editFinally launch the job with
./bin/flink run -c org.wikimedia.mediawiki.event.enrichment.Enrichment enrichment-1.0-SNAPSHOT-jar-with-dependencies.jar
Restart the job
editThe job uses kafka offsets to determine start (resume) points. A stopped job can be restarted. Streaming will resume from the latest recorded Kafka offset.
Yarn deployment (long lived kerberos ticket)
editCurrently Mediawiki Stream Enrichment runs as `analytics` job in the YARN production queue. This deployment consist of a Session cluster and the job itself.
Startup scripts can be found at:
- Download and setup Flink 1.15 https://gitlab.wikimedia.org/-/snippets/45
- Start Flink Session cluster and enrichment job (latest release) https://gitlab.wikimedia.org/-/snippets/43#LC5
By default a flink dist is setup in `/tmp` on `an-launcher1002`. This is an intentional ephemeral installation. Upon server restart Flink cluster and the enrichment job will need to be re-deployed.
View the output of a Flink job at the command line
editOn YARN stdout is directed to the container job, and won't be visible from the cli. We can display container output by accessing its logs with
yarn logs -applicationId <applicationId> -containerId <containerId>
Where
- <applicationId> is the Flink cluster id returned by yarn-session.sh, and visible at https://yarn.wikimedia.org.
- <containerId> is the container running a specific task, that you can find in Flink's Task Manager at https://yarn.wikimedia.org/proxy/<applicationId>/#/task-manager.
For more details see the project doc. The Flink Web Interface will be available at yarn.wikimedia.org under https://yarn.wikimedia.org/proxy/<applicationId>.
Config
editThere's a couple of gotchas.
JVM
editWe need to rewrite the Host HTTP header to properly route HTTP request from the internal YARN cluster to https://api-ro.discovery.wmnet.
To do so, we need to configure the JVM http-client to allow restricted headers.
Add the following to conf/flink-conf.yaml:
env.java.opts: -Dsun.net.http.allowRestrictedHeaders=true
Kerberos
editKerberos authentication is required to access WMF Analytics resources. The relevant config settings are found in conf/flink-conf.yaml: ===
security.kerberos.login.use-ticket-cache: true
<nowiki>#</nowiki> security.kerberos.login.keytab:
security.kerberos.login.principal: krbtgt/WIKIMEDIA@WIKIMEDIA
<nowiki>#</nowiki> The configuration below defines which JAAS login contexts
security.kerberos.login.contexts: Client,KafkaClient
Scala free Flink
editflink-scala deps must be removed from the Flink distribution. As of 1.15 we run flink scala free. See https://flink.apache.org/2022/02/22/scala-free.htm.
rm flink-1.15/lib/flink-scala*