Platform Engineering Team/Event Platform Value Stream/PoC Mediawiki Stream Enrichment

This page describes an implementation of T307959.

1. Code: https://gitlab.wikimedia.org/repos/data-engineering/mediawiki-stream-enrichment

2. Package: https://gitlab.wikimedia.org/repos/data-engineering/mediawiki-stream-enrichment/-/packages/234

3. Status: PoC

4. Deployment environment: YARN https://phabricator.wikimedia.org/T323914

Mediawiki Stream Enrichment edit

A proof of concept Flink Service that consumes page_change events and produces wikitext enriched events in page_content_change.

Consume enriched events. edit

Eneriched events are produced into eqiad.rc0.mediawiki.page_content_change brokered in Kafka Jumbo,

Example:

kafkacat -C -b kafka-jumbo1001.eqiad.wmnet:9092 -t eqiad.rc0.mediawiki.page_content_change

Flink on YARN edit

The job has been tested on Flink 1.15.

tl;dr: the steps below are more or less automated by this unsupported script: https://gitlab.wikimedia.org/-/snippets/41 .

A standalone cluster can be setup locally (on a stat machine atop YARN) with

wget <nowiki>https://dlcdn.apache.org/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz</nowiki>

tar xvzf flink-1.15.0-bin-scala_2.12.tgz

cd flink-1.15.0

export HADOOP_CLASSPATH=`hadoop classpath`

./bin/yarn-session.sh --detached

The package target can be manually copied to a stat machine with:

scp  target/enrichment-1.0-SNAPSHOT-jar-with-dependencies.jar stat1005.eqiad.wmnet:~/flink-1.15.0

Start a Flink cluster on YARN with

export HADOOP_CLASSPATH=`hadoop classpath`

./bin/yarn-session.sh --detached

From yarn.wikimedia.org you can access the Flink dashboard. This will allow monitoring job execution (Task Manager panel), and eventually stopping the job.

Job lifecycle management edit

Launch the job edit

Finally launch the job with

./bin/flink run -c org.wikimedia.mediawiki.event.enrichment.Enrichment enrichment-1.0-SNAPSHOT-jar-with-dependencies.jar

Restart the job edit

The job uses kafka offsets to determine start (resume) points. A stopped job can be restarted. Streaming will resume from the latest recorded Kafka offset.

Yarn deployment (long lived kerberos ticket) edit

Currently Mediawiki Stream Enrichment runs as `analytics` job in the YARN production queue. This deployment consist of a Session cluster and the job itself.

Startup scripts can be found at:

By default a flink dist is setup in `/tmp` on `an-launcher1002`. This is an intentional ephemeral installation. Upon server restart Flink cluster and the enrichment job will need to be re-deployed.

View the output of a Flink job at the command line edit

On YARN stdout is directed to the container job, and won't be visible from the cli. We can display container output by accessing its logs with

yarn logs -applicationId <applicationId> -containerId <containerId>

Where

  • <applicationId> is the Flink cluster id returned by yarn-session.sh, and visible at https://yarn.wikimedia.org.
  • <containerId> is the container running a specific task, that you can find in Flink's Task Manager at https://yarn.wikimedia.org/proxy/<applicationId>/#/task-manager.

For more details see the project doc. The Flink Web Interface will be available at yarn.wikimedia.org under https://yarn.wikimedia.org/proxy/<applicationId>.

Config edit

There's a couple of gotchas.

JVM edit

We need to rewrite the Host HTTP header to properly route HTTP request from the internal YARN cluster to https://api-ro.discovery.wmnet.

To do so, we need to configure the JVM http-client to allow restricted headers.

Add the following to conf/flink-conf.yaml:

env.java.opts: -Dsun.net.http.allowRestrictedHeaders=true

Kerberos edit

Kerberos authentication is required to access WMF Analytics resources. The relevant config settings are found in conf/flink-conf.yaml: ===

security.kerberos.login.use-ticket-cache: true

<nowiki>#</nowiki> security.kerberos.login.keytab:

security.kerberos.login.principal: krbtgt/WIKIMEDIA@WIKIMEDIA

<nowiki>#</nowiki> The configuration below defines which JAAS login contexts

security.kerberos.login.contexts: Client,KafkaClient

Scala free Flink edit

flink-scala deps must be removed from the Flink distribution. As of 1.15 we run flink scala free. See https://flink.apache.org/2022/02/22/scala-free.htm.

rm flink-1.15/lib/flink-scala*