Platform Engineering Team/Event Platform Value Stream/PoC Mediawiki Stream Enrichment
This page describes an implementation of T307959.
1. Code: https://gitlab.wikimedia.org/repos/data-engineering/mediawiki-stream-enrichment
2. Package: https://gitlab.wikimedia.org/repos/data-engineering/mediawiki-stream-enrichment/-/packages/234
3. Status: PoC
4. Deployment environment: YARN https://phabricator.wikimedia.org/T323914
Mediawiki Stream Enrichment edit
A proof of concept Flink Service that consumes page_change events and produces wikitext enriched events in page_content_change.
- Source schema: https://schema.wikimedia.org/repositories//primary/jsonschema/development/mediawiki/page/change/1.1.0.yaml
- Destination schema: https://schema.wikimedia.org/repositories//primary/jsonschema/development/mediawiki/page/change/1.1.0.yaml
Consume enriched events. edit
Eneriched events are produced into eqiad.rc0.mediawiki.page_content_change brokered in Kafka Jumbo,
Example:
kafkacat -C -b kafka-jumbo1001.eqiad.wmnet:9092 -t eqiad.rc0.mediawiki.page_content_change
Flink on YARN edit
The job has been tested on Flink 1.15.
tl;dr: the steps below are more or less automated by this unsupported script: https://gitlab.wikimedia.org/-/snippets/41 .
A standalone cluster can be setup locally (on a stat machine atop YARN) with
wget <nowiki>https://dlcdn.apache.org/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz</nowiki>
tar xvzf flink-1.15.0-bin-scala_2.12.tgz
cd flink-1.15.0
export HADOOP_CLASSPATH=`hadoop classpath`
./bin/yarn-session.sh --detached
The package target can be manually copied to a stat machine with:
scp target/enrichment-1.0-SNAPSHOT-jar-with-dependencies.jar stat1005.eqiad.wmnet:~/flink-1.15.0
Start a Flink cluster on YARN with
export HADOOP_CLASSPATH=`hadoop classpath`
./bin/yarn-session.sh --detached
From yarn.wikimedia.org you can access the Flink dashboard. This will allow monitoring job execution (Task Manager panel), and eventually stopping the job.
Job lifecycle management edit
Launch the job edit
Finally launch the job with
./bin/flink run -c org.wikimedia.mediawiki.event.enrichment.Enrichment enrichment-1.0-SNAPSHOT-jar-with-dependencies.jar
Restart the job edit
The job uses kafka offsets to determine start (resume) points. A stopped job can be restarted. Streaming will resume from the latest recorded Kafka offset.
Yarn deployment (long lived kerberos ticket) edit
Currently Mediawiki Stream Enrichment runs as `analytics` job in the YARN production queue. This deployment consist of a Session cluster and the job itself.
Startup scripts can be found at:
- Download and setup Flink 1.15 https://gitlab.wikimedia.org/-/snippets/45
- Start Flink Session cluster and enrichment job (latest release) https://gitlab.wikimedia.org/-/snippets/43#LC5
By default a flink dist is setup in `/tmp` on `an-launcher1002`. This is an intentional ephemeral installation. Upon server restart Flink cluster and the enrichment job will need to be re-deployed.
View the output of a Flink job at the command line edit
On YARN stdout is directed to the container job, and won't be visible from the cli. We can display container output by accessing its logs with
yarn logs -applicationId <applicationId> -containerId <containerId>
Where
- <applicationId> is the Flink cluster id returned by yarn-session.sh, and visible at https://yarn.wikimedia.org.
- <containerId> is the container running a specific task, that you can find in Flink's Task Manager at https://yarn.wikimedia.org/proxy/<applicationId>/#/task-manager.
For more details see the project doc. The Flink Web Interface will be available at yarn.wikimedia.org under https://yarn.wikimedia.org/proxy/<applicationId>.
Config edit
There's a couple of gotchas.
JVM edit
We need to rewrite the Host HTTP header to properly route HTTP request from the internal YARN cluster to https://api-ro.discovery.wmnet.
To do so, we need to configure the JVM http-client to allow restricted headers.
Add the following to conf/flink-conf.yaml:
env.java.opts: -Dsun.net.http.allowRestrictedHeaders=true
Kerberos edit
Kerberos authentication is required to access WMF Analytics resources. The relevant config settings are found in conf/flink-conf.yaml: ===
security.kerberos.login.use-ticket-cache: true
<nowiki>#</nowiki> security.kerberos.login.keytab:
security.kerberos.login.principal: krbtgt/WIKIMEDIA@WIKIMEDIA
<nowiki>#</nowiki> The configuration below defines which JAAS login contexts
security.kerberos.login.contexts: Client,KafkaClient
Scala free Flink edit
flink-scala deps must be removed from the Flink distribution. As of 1.15 we run flink scala free. See https://flink.apache.org/2022/02/22/scala-free.htm.
rm flink-1.15/lib/flink-scala*