SparkException: Parquet column cannot be converted in file

SparkException: Parquet column cannot be converted in file

Got the error while reading nyc yellow taxi data…


24/04/09 16:15:12 INFO FileScanRDD: Reading File path: /nyc_yellow_cab_data/2015/12/yellow_tripdata_2015-12.parquet, range: 0-134217728, partition values: [empty row]
24/04/09 16:15:12 WARN TaskSetManager: Lost task 4.0 in stage 2.0 (TID 101) (192.168.0.2 executor driver): org.apache.spark.SparkException: Parquet column cannot be converted in file file:nyc_yellow_cab_data/2015/04/yellow_tripdata_2015-04.parquet.

Column: [congestion_surcharge], Expected: double, Found: INT32.at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedSchemaColumnConvertError(QueryExecutionErrors.scala:868)



The parquet directory containing the data with has files with different schemas.

i.e. one file above has congestion_surcharge with type DOUBLE and another file has Type INT32.

hence the mismatch. Fix the files.

you can try mergeSchema = true, but in this case it wont work.

Making github workflows a little Joyful

Recently at work, i came with a project called Seeds – Its a curated/prepared dataset which we send through our pipeline and at the end of it, we verify the expected vs actual results.

Expected You say ? – since the dataset is custom prepared by us, we are aware of its nature and hence know what to expect. Example: Prepare a dataset of 100 orders for customer ‘Adam’ and push it thru your pipeline. I expect select count(*) from orders where customer =’Adam’ ==== 100 at the end of the pipeline.

So i came up with Harvestor, its a github workflow which does the pipeline setup, plants the seeds (send the data) and does various verification steps at various checkpoints in the pipeline. At the end, it verifies the Harvest, i.e the final result.

The normal github pipelines looked 🥱☹️, So i sprinkled a little Emojis to brighten up the place 🤗

The project naming also resonated well with my team mates too!

That combined with nice icons for slack channel – made the cake complete with cherries.

I call this DATA Art or DartA.

let me know what you think. Hope this inspires you.

Lambda to Dedup less data – Spark vs OwnSolution

Lambda to Dedup less data – Spark vs OwnSolution

Hi guys,

Some data pipelines typically have a DeDup phase to remove duplicate records.

Well I came across a scenario where

  • the data to dedup was < 100Mb
  • Our company goes with a Serverless theme
  • + we are a startup so fast development is a given

So naturally, I thought of

  • Aws Step Functions (to serve as our pipeline) – as input data is < 100MB
  • Lambda for each phase of the pipeline
    • Dedup
    • Quality check
    • Transform
    • Load to warehouse

Now for the Dedup lambda (Since i have used Spark before) I thought it would be like 3 lines of code and hence easy to implement.

Dataset<Row> inputData = sparkSession.read().json(inputPath);
Dataset<Row> deduped = inputData.dropDuplicates(columnName);
deduped.write().json(outputPath);
//using spark local mode - local[1]

I packaged it and ran the lambda … it took > 20 seconds for 10mB of data ☹️

This got me thinking 🤔…. this sounds like overkill

so I decided to write my own code using python (boto3 for s3 interaction + simple hashmap to dedup)

That is a 92.8% decrease 😮 in running time lol

so lets summarize

Apache Spark (local mode)Own Solution (python boto3 + hashmap)
code
effort
very low (spark
does everything)
medium (have to write code to
download + dedup + upload to s3)
Mem requirements >=512 MB (spark needs min 512)<128 MB
Mem Used600MB78 MB
Running time Sec23.7 ❌1.7 ✅

Moral of the Story

  • When you think data pipelines, don’t always go to Apache Spark
  • Keep in mind the amount of data being processed
  • Less code is good for code maintenance but might not be performant
    • i.e. Ease of development is a priority but cost comes first.
  • Keep an eye on cost $ 💰
    • Here we have decreased cost by 92% as lambda bills per running time & memory used
Golang & Kubernetes – may not be a smooth ride sometimes

Golang & Kubernetes – may not be a smooth ride sometimes

Theses days many teams have deployed apps via kubernetes. At my current workplace eyeota, we recently moved one of our golang apps to kubernetes.

We have a simple golang application which read messages from kafka, enriched them via a scylla lookup & pushed the enriched messages back to kafka.

This app had been running well for years, until the move to kubernetes, we had noticed that kafka lag started to grow at a high rate. During this time, our scylla cluster had been expierencing issues & obviously db lookups were taking time, which had contributed to delayed processing/kafka lag.

Our devops team had added more nodes to the scylla cluster in an effort to decrease the db slowness, but the lag continued to grow.

Steps to Mitigate:

  1. Increasing numbe of pods : we increased from 4 to 6 pods. This did not help much.
  2. Reduce Db timeouts/retries: This did not help that well either

Further Analysis of Metrics Dashboard:

This time I chose to have a look a the kubernetes dashboard and found that one particular metric was strange

kubernetes cpu pod throttling” !

When i checked other apps, this metric was quite low but not for my app. It would seem that kubernetes is throttling my app.

Fix

Based on this golang bug (runtime: make GOMAXPROCS cfs-aware on GOOS=linux), it would seem that golang did not respect the quota set for it in kubernetes i.e. golang was setting GOMAXPROCS to a value greater than quota.

We run a lot of go-routines in our code !

To workaround this, we needed to import the ‘uber golang automax-procs‘ lib in our code which make sure that golang was setting GOMAXPROCS according to the kubernetes cpu quota set.

i.e. a simple one line change

import _ "go.uber.org/automaxprocs"

Walah, pushed the change and deployed it.

The effects were as follows:

  1. Pod Throttling metric decreased ⬇️ ✅

2. Golang GC times decreased ⬇️ ✅

3. Kafka Message Consumption increased ↗️ ✅ & Lag decreasing ↘️✅

After the fix was deployed, we noticed an increase in msg consumption. On increasing the number of pods from 4 to 8 (note: pod quotas remained unchanged), we noticed that consumption increased even more (horizontal scaling was working).

Lag also began to drop nicely 😇.

Recommendation:

If you are using golang, Do check if pod throttling is occuring. You might be happy with our app even if its happening, but if its removed , it can do better 😀


Any feedback is appreciated. Thanks for reading.

ps: golang version 1.12, kubernetes version v1.18.2 on linux

Understanding Docker In Docker with Gitlab CI

Understanding Docker In Docker with Gitlab CI

Have been playing around with gitlab recently and trying to create a build pipeline for my golang app.

So thought of documenting my learnings visually about in docker in docker and interactions in the docker executor of the gitlab runners.

I recommend you read a little about gitlab CI and play around it with a bit to get a feel about the workflow in gitlab.

Lets say we have a project which basically does the following:

  • connects to a httpd server and pulls information.
  • it processes the information and pushes to kafka / mysql

Assume we write the code in golang and write normal tests along with integration tests.

The integration tests rely on a launching httpd server / kafka / zookeeper/mysql as docker containers.

 

For that lets say we do the following :

  • we have a docker compose which launches kafka / zk only
  • we manually launch the httpd server using ‘docker run’ (we can launch it in docker compose too. but lets do it separately as an exercise for the sake of learning to understand how dockers interact in gitlab ci).
  • we manually launch mysql (we can launch it in docker compose too. but lets do it separately as an exercise for the sake of learning to understand how dockers interact in gitlab ci)

i.e.

  • docker-compose up
  • docker run -t httpd
  • #start mysql somehow
  • go test

 

Now we come up with a gitlab-CI definition file which specifies 2 stages – test and build docker image for our app.

Here is a sample GITLAB-CI file (you may ignore the details now) but remember we intend to do tests and build docker.

Screenshot 2020-04-24 at 20.56.06

So lets focus on the TEST stage,

  • Every build defined in gitlab-ci.yml file is allocated to a gitlab runner process.

 

  • Each gitlab runner executes the build using an executor (many types of executors are there).

 

  • We shall configure our runner to use the docker executor.

 

The following diagram illustrates what happens when we run the build in gitlab CI.

Screenshot 2020-04-24 at 20.58.01

 

The above diagram focuses on the TEST_JOB of TEST stage.

Hope the diagram is self explanatory!

PS: hope you noticed the Docker in Docker In Docker

i.e. httpd container in dind container in docker executor container.

Any feedback is welcome.

Install librdkafka on alpine docker

Install librdkafka on alpine docker

Made a few optimizations to reduce the size from 342 MB down to 218MB down to 53MB.

here is the gist to the the docker file.


FROM alpine:3.9
ENV LIBRD_VER=1.3.0
WORKDIR /tmp
#vishnus-MacBook-Pro:librd vrao$ docker images |grep lib
#lib proper_cleanup 675073279e9c 4 seconds ago 53.3MB
#lib cleanup 7456af7df73b 2 minutes ago 218MB
#lib simple 9724aed9519c 7 minutes ago 342MB
######### simple install ################# -> 342MB
#RUN apk add –no-cache –virtual .make-deps bash make wget git gcc g++ && apk add –no-cache musl-dev zlib-dev openssl zstd-dev pkgconfig libc-dev
#RUN wget https://github.com/edenhill/librdkafka/archive/v${LIBRD_VER}.tar.gz
#RUN tar -xvf v${LIBRD_VER}.tar.gz && cd librdkafka-${LIBRD_VER} && ./configure –prefix /usr && make && make install
##########################################
######### cleanup deps ################ -> 218 MB
#RUN apk add –no-cache –virtual .make-deps bash make wget git gcc g++ && apk add –no-cache musl-dev zlib-dev openssl zstd-dev pkgconfig libc-dev
#RUN wget https://github.com/edenhill/librdkafka/archive/v${LIBRD_VER}.tar.gz
#RUN tar -xvf v1.3.0.tar.gz && cd librdkafka-${LIBRD_VER} && ./configure –prefix /usr && make && make install && make clean
#RUN rm -rf librdkafka-${LIBRD_VER} && rm -rf v${LIBRD_VER}.tar.gz && apk del .make-deps
##########################################
######### optimised Size with proper cleanup #################
# U need to install deps, build the library, and uninstall the deps in single command so that image size not polluted by these deps, example: if you run 'apk del' as separate RUN command , size does not reduce.
# as docker works on concepts on layers, each run adds to layers and u cant delete prev layer -> https://github.com/gliderlabs/docker-alpine/issues/186
# so we need to install deps, build librd kafka and then uninstall/cleanup the deps in single command to keep size less.
# Size of docker image > 250 MB without this logic, with this logic ~53MB
# for debugging run separately each command instead of anding them.
RUN apk add –no-cache –virtual .make-deps bash make wget git gcc g++ && apk add –no-cache musl-dev zlib-dev openssl zstd-dev pkgconfig libc-dev && wget https://github.com/edenhill/librdkafka/archive/v${LIBRD_VER}.tar.gz && tar -xvf v${LIBRD_VER}.tar.gz && cd librdkafka-${LIBRD_VER} && ./configure –prefix /usr && make && make install && make clean && rm -rf librdkafka-${LIBRD_VER} && rm -rf v${LIBRD_VER}.tar.gz && apk del .make-deps
################################################################
RUN export PKG_CONFIG_PATH=$PKG_CONFIG_PATH:/usr/lib/pkgconfig/
ENV PKG_CONFIG_PATH=$PKG_CONFIG_PATH:/usr/lib/pkgconfig/

view raw

dockerfile

hosted with ❤ by GitHub

Any feedback is always welcome.

Kafka – Restoring Under replicated Partition & Broker to ISR

Kafka – Restoring Under replicated Partition & Broker to ISR

I recently came across a scenario in kafka where our brokers have been pushed out of the ISR and partitions have been declared under replicated. This situation has been going on for weeks & our brokers cant seem to catch up. Due to this paralysis, partition reassignment is also failing/stuck.

Inspired by this blog post on kafka storage internals, i came up with a procedure to bring the brokers back into ISR.

Before i go into the procedure, here are few things to understand:

  1. Offset of message in a partition remains the same both in the leader broker and replica broker.
  2. Position of message having a certain offset in the log file in leader broker may or may not be the same in replica broker. It does not matter if the position matches or not because a consumer is only interested in offset not where its located in the file. But in practice, based on my tests, i have noticed its the same and i believe it should be the same.  if a message with offset 0 in leader broker, is say 10 bytes in length, offset 1 should be at position 11, so i would think the same applies to replica broker as messages are ordered and storing the message at a different position in replica broker does not make sense, it complex to have such logic.
  3. There are 3 important files in a broker log.dir to take note of :
    1. replication-offset-checkpoint -> for a topic partition, the broker stores the latest offset it has successfully replicated.
    2. recovery-point-offset-checkpoint -> for a topic partition, broker stores/tracks which messages (from-to offset) were successfully check pointed to disk.recovery-point-offset-checkpoint.
    3. log-start-offset-checkpoint -> for a topic partition, broker stores the first/earliest offset it has available at the moment. refer here.
    4. The format of the above 3 files is the following:
      1. first line is version
      2. 2nd line = number of info lines
      3. each line has topic name followed by partition followed by offset

Screenshot 2020-03-31 at 16.00.13

Test Conditions to induce Under replication:

  1. 3 brokers, 1 zookeeper (kafka_2.12-1.1.0/ zookeeper-3.4.13)
  2.  ./kafka-topics.sh –create –topic test_isr –partitions 2 –replication-factor 2 –zookeeper localhost:2181
  3. broker with id’s : 0, 1, 2
  4. ./kafka-producer-perf-test.sh –throughput 1000 –record-size 1024 –topic learn_compact –num-records 100000 –producer-props acks=1 bootstrap.servers=192.168.0.167:9092,192.168.0.167:9093,192.168.0.167:9094

Screenshot 2020-03-31 at 16.28.35Screenshot 2020-03-31 at 16.28.45

We notice here that brokerId: 1, is not a leader. lets focus on this guy for test simplicity.

Test Starting:

  1. brokers up
  2. start the perf producer
  3. after 2 iterations of producing stop/kill broker id: 1,
  4. wait for perf producer to finish.
  5. You shall notice that brokerid: 1 is now out of ISR and partition 0 is under replicated.

Screenshot 2020-03-31 at 16.38.46

Screenshot 2020-03-31 at 16.41.45

Broker 1 is out of ISR & partition 0 is under replicated and we have 100k messages published.

Recovery for Partition 0:

Note: my kafka log dirs is for brokers 0 , 1, 2 are following respectively

  •    /Users/vrao/work/utils/data/kafka-logs1.  –> broker 0
  •    /Users/vrao/work/utils/data/kafka-logs2.  –> broker 1
  •    /Users/vrao/work/utils/data/kafka-logs3.  –> broker 2

On broker id : 1

1. Go to the partition dir and move existing data to a backup dir.

  1. mkdir /tmp/backup_test_isr-0;

  2. mv /Users/vrao/work/utils/data/kafka-logs2/test_isr-0/* /tmp/backup_test_isr-0/

2. Stop Leader Broker i.e brokerId 2. (i.e. stop further writes)

3. Copy data from of partition 0 from broker 2 to broker 1:

Screenshot 2020-03-31 at 17.06.02

4. Now Find the latest replicated offset in brokerId 2 from replication-offset-checkpoint file.

Screenshot 2020-03-31 at 17.08.06

its 50000 for partition 0 of topic test_isr

4.1 Now edit replication-offset-checkpoint of broker 1 and update the value to 50000.

Before editing, you will notice the offset it has replicated is less than 50k i.e.

Screenshot 2020-03-31 at 17.10.08

Now modify file and update 5514 to 50k. (5514 means it was able to replicate successfully upto offset 5514).

After modification file should be as follows:

Screenshot 2020-03-31 at 17.12.00

5. Now Find the latest check pointed offset in brokerId 2 from recovery-point-offset-checkpoint file.

Screenshot 2020-03-31 at 17.13.37

5.1 Now edit recovery-point-offset-checkpoint of broker 1 and update the value to 46353.

Before editing, you will notice something like

Screenshot 2020-03-31 at 17.14.41

Now modify file and update 5515 to 46353. (5515 means it was able to checkpoint successfully upto offset 5515).

After modification file should be as follows:

Screenshot 2020-03-31 at 17.15.56

6. Now follow the same steps (5/5.1 or 4/4.1) with the file log-start-offset-checkpoint 

i.e. <log.dir>/log-start-offset-checkpoint

Make sure that the offset for partition 0 for our topic in leader broker is the same in replica broker.

i.e. copy the value of partition 0 from /Users/vrao/work/utils/data/kafka-logs3/log-start-offset-checkpoint  to /Users/vrao/work/utils/data/kafka-logs2/log-start-offset-checkpoint.

If the file is empty or has no data , then u have nothing to do 🙂

7. Now we are ready to start broker id: 1

You will notice it joins cluster and ISR has expanded back !

Screenshot 2020-03-31 at 17.25.37Screenshot 2020-03-31 at 17.22.53

8. Now lets verify if things are good with regard to number of messages.

Shut down the leader node i.e. brokerId: 2 so that our replica becomes the new leader.

From our test partition 1 was never affected so it will have 50k msgs and now our replica has become leader for partition 0 i.e. for other 50k msgs.

so we should get 100k msgs.

The following screenshot shows that our replica is now leader (brokerId :1 is leader) and erstwhile leader is offline.

Screenshot 2020-03-31 at 17.29.40

9. Now start a consumer to the topic and you should get 100k msgs.

./kafka-console-consumer.sh –topic test_isr –bootstrap-server 192.168.0.167:9092,192.168.0.167:9094 –from-beginning

Screenshot 2020-03-31 at 17.36.02

So recovery is complete ! We were able to recover data from the erstwhile out of ISR broker and partition is no longer under replicated.

Ps: i did a video recording for same scenario for 50k msgs here, u can find part1 here and part2 here

Ps: I have seen some blog posts where people manually move data and edit the checkpoint files & recover brokers. i hope the above steps are logical . Hope to try in production! Will update if i do. From my test, things look promising.

and as always feedback is welcome.

 

———————————————————————————————————————————————————————————-

References:

My thanks to the authors of :

View at Medium.com

https://medium.com/@durgaswaroop/a-practical-introduction-to-kafka-storage-internals-d5b544f6925f

http://ixiaosi.art/2019/05/20/kafka/kafka启动分析/

https://stackoverflow.com/questions/35407090/explain-replication-offset-checkpoint-and-recovery-point-offset-in-kafka

for some details on log-start-offset-checkpoint u may see refer to this kafka confluence page too. Search for ‘log-begin-offset-checkpoint’

Tools used:

https://github.com/obsidiandynamics/kafdrop

 

 

 

Scylla : received an invalid gossip generation for peer [How to resolve]

Scylla : received an invalid gossip generation for peer [How to resolve]

At my current org eyeota.com, we run a lot of Scylla & Cassandra clusters.

Our Scylla cluster for analytics/datascience has > 40 nodes holding terra-bytes of data & runs on version = 2.1.3-0.20180501.8e33e80ad.

The cluster recently had been losing nodes , because those nodes restarted & were not being allowed to join the cluster during gossip phase of bootup. The reason being: nodes which where status=UN (up & normal) were spewing the following error & not allowing those affected nodes to join the cluster during the gossip phase.

Jul 04 01:54:17 host-10.3.7.77 scylla[30263]: [shard 0] gossip – received an invalid gossip generation for peer 10.3.7.7; local generation = 1526993447, received generation = 1562158865

Now lets go into the details & context of the above error message:

  • Every node is configured with a list of seeds, to which it tries to gossip & gather cluster info during bootup.

 

  • On bootup it creates a ‘generation’ number(generation number is a epoch) which it shares with the seed host during gossip. (refer code:  here)   Screenshot 2019-07-05 at 11.21.44

 

  • The node on its FIRST ever bootup sends its generation number to the seed & seed gossips with others to pass on the info. The seed stores this generation number as a reference. This is referred to as local_generation term referred in the above error message i.e. the UN node 10.3.7.77 was saying that peer 10.3.7.7 was sending a generation number 1562158865 (i.e. referred to as remote_generation) but it had stored as reference 1526993447. You will notice that 1526993447 refers to epoch may 22, 2018 & 1562158865 refers to july 3, 2019 epoch, i.e. the node 10.3.7.7 first started on may 22, 2018 & had sent its generation number as 1526993447.

 

  • Since the difference between the 2 epochs is greater than 1 year the UN node will refuse to allow the other node to join (see code here)

Screenshot 2019-07-05 at 11.46.06        Screenshot 2019-07-05 at 11.45.46

 

  • Now during bootup , the logic for increment_and_get is:  (refer code: here)       Screenshot 2019-07-05 at 11.24.44.png
  • From the above logic, the server first looks up the generation number from the system.local table. if the value is empty , it generates a fresh number i.e. current time as the logic for generating a generation number depends solely on current time (see here or pic below) . if its not empty, it compares with current time & uses the larger value i.e. the more recent time & writes it back to system.local table (hence this stackoverflow suggestion of updating the table wont work.).

Screenshot 2019-07-05 at 11.28.39

  • So the generation-number generated & sent by node to seed on bootup is always in general closer to current time but the generation number stored by the seed UN node as a local reference does not change.
  • In our example error message, the UN node 10.3.7.7 reports that peer 10.3.7.7 is sending 1562158865, but since 1526993447 + 1 year < 1562158865, it wont be allowed to join.

 

So how do we solve this problem:

(1) we tried multiple rolling restarts as suggested on internet but no joy! – it might or might not work. 

(2) Stop all clients. Stop all nodes & start them ALL up with in a window of few minutes. – this will work as generation numbers will be reset across the cluster (have not verified this reasoning but people on internet have reported this approach to work) . The only problem is DOWNTIME Screenshot 2019-07-05 at 12.03.35 with an entire cluster being rebooted.

(3) To entire avoid cluster reboot: We adopted this approach in production based on the code logic explained above.

  • The root problem was that the local generation of the problematic node stored in the UN seed node was not changing. (but the problematic node on every reboot sends a fresh gen number which is closer to current time)
  • IDEA Screenshot 2019-07-05 at 12.23.55.png : Lets update the local generation of the problematic node stored in the UN node so that the the remote-generation number being sent by the problematic node would fall within 1 year.
    • So how we update this value in the UN seed node ? Screenshot 2019-07-05 at 12.29.09.png
      • we need to make the problematic node send a gen number (epoch) with a value which falls in 1 year window of the local gen number stored in UN seed node.
      • but since the code always takes current time as gen number and current time is july 2019, what can we do ?
      • We change TIME Screenshot 2019-07-05 at 12.31.36.pngback on the problematic node to a value which falls within 1 year of 1526993447. choose a epoch value towards the end of 1 year window, i.e change system time to a value say march 31, 2019 i.e. epoch 1554030000 rather than october 2, 2018 & restart the node. the node will reboot & send a gen number 1554030000(as it looks up system.local table) or current time which is march 31, 2019 anyway to the seed.
      • The UN seed node gets this value & validates that the remote-generation number sent by problematic node is within 1 year of may22,2018 so, it proceeds to update its reference (local generation). Screenshot 2019-07-05 at 12.17.58.png
      • Wallah Screenshot 2019-07-05 at 12.38.28, we have successfully the updated the reference (local gen) of the problematic node stored in the UN seed node.
      • Now we stop problematic node, reset the time back on the problematic node to current time & reboot, the problematic node will send latest epoch of say july 4, 2019 i.e epoch 1562215230 
      • Now since 1562215230 (gen sent be problematic node using latest time) minus 1554030000 (local reference stored in UN seed node) < 1 year, the problematic node will be allowed to join the cluster .
      • hurray Screenshot 2019-07-05 at 12.43.04.png
      • we advise you to choose an epoch/date towards end of the 1 year window but within 1 year, the later the better as a new 1 year window starts from the date you choose & this problem is mitigated for that long LOL – YEP this problem occurs on long running clusters. What this means is that you need to do rolling restarts once in a while every year to extend the 1 year window. reminds me of visa extensions hehe.

Steps:

  1. if the problematic node is 10.3.7.7 and error is reported on say 10.3.7.77 (UN node), make sure the seed for 10.3.7.7 is 10.3.7.77 so that we guarantee its talking to this node and we dont have to search to find out who its talking too in the cluster. if the seed for 7.7 node is different from the node reporting the error, then look at the error message printed by the seed node to decide which epoch to reset too. in our case, since i saw the error on 7.77, i changed the seed of 7.7 to 7.77 node.
  2. start the problematic node.
  3. the seed node should start printing the error. capture the error message for our node and make note of local gen number for us to choose a date to reset too. in our case the msg was as follows:
  4. Jul 04 01:54:17 host-10.3.7.77 scylla[30263]: [shard 0] gossip – received an invalid gossip generation for peer 10.3.7.7; local generation = 1526993447, received generation = 1562158865

  5. cqlsh to problematic node 10.3.7.7 & update generation number to an epoch within 1 year of 1526993447 BUT choose an epoch towards the end of the 1 year window like 1554030000 (march 31, 2019) rather than say july/october 2018 so that you have a longer new 1 year window.
  6. #on problematic node, run

    1. update system.local set gossip_generation = 1554030000 where key=‘local’;
  7. #on problematic node, run

    1. nodetool flush
  8. stop problematic node
  9. edit the config file & change CQL (native_transport_port) from 9042 to 9043 so that clients cant connect & insert data – insertion of data during this phase will set records with a timestamp of march 2019 which is not correct i.e. prevent data corruption. This is a precaution Screenshot 2019-07-05 at 12.58.49.png.
  10. change system time i.e. “date -s ’31 MAR 2019 11:03:25′”
  11. verify system time has changed by running date command
  12. start the problematic node & tail logs of the UN seed node, the error should go away.
  13. wait for some time (few minutes is enough) for gossip to occur & verify if problematic node is now UN. Screenshot 2019-07-05 at 14.01.47
  14. You can tail logs of the UN seed node & check if you still get the error.
    1. if you do see the error again – repeat the steps again from the start. u mite have missed something.
  15. once the node is declared UN:
    1. shutdown the node
    2. change the CQL (native_transport_port) back from 9043 to 9042 in config file.
    3. RESET THE system time on the box
    4. verify system time is back to normal
  16. start up the node once you have changed back time and port.Screenshot 2019-07-05 at 12.58.49Screenshot 2019-07-05 at 12.29.09
  17. You will notice that the node is still UN 🙂

Confessions:

  1. yes we did this exercise in production. The node was anyways considered dead, hence risk was minimal as screwing up a dead node even more wont make a difference and if the procedure failed, we would sacrifice 1 node only and hence be left with the only option to cluster reboot.
  2. we scanned the scylla code base of master branch for usages of system time in cluster communication and found only 2 places which gave us confidence that changing system time would work. also by changing CQL port to 9043, we eliminated any contamination of existing data by clients.Screenshot 2019-07-05 at 14.13.51.png

Morals of the story:

  1. this happened in 2.1 version of scylla, and as of today july 4, 2019 the master branch of scylla still has the same code logic, hence this can occur in version 3 and above too.
  2. every few months better do rolling restarts of the nodes, so that the nodes send a new gen number for gossip and the 1 year window is extended.
  3. if you have a long running cluster > 1 year , if a node is restarted, it will be affected by this error, the more node  restarts that happen, the more the epidemic spreads.
  4. this can work for cassandra if the code logic is the same, which i think it is.

References:

https://github.com/scylladb/scylla/blob/134b59a425da71f6dfa86332322cc63d47a88cd7/gms/gossiper.cc

https://github.com/scylladb/scylla/blob/94d2194c771dfc2fb260b00f7f525b8089092b41/service/storage_service.cc

https://github.com/scylladb/scylla/blob/077c639e428a643cd4f0ffe8e90874c80b1dc669/db/system_keyspace.cc

https://stackoverflow.com/questions/40676589/cassandra-received-an-invalid-gossip-generation-for-peer

Thanks

to my team mate Rahul Anand for helping out in debugging the issue & providing key breakthroughs during the firefighting

if you find it useful – please comment on either stackoverflow or here. thanks.

Update:

The folks at scylla have now fixed this.

refer: https://github.com/scylladb/scylla/pull/5195

golang: Parse a large json file in streaming manner

hi all,

i recently tried to parse a large json file in golang and used

'ioutil.ReadAll(file)' followed by 'json.Unmarshal(data, &msgs)'

This lead to a huge spike in memory usage as the entire file is read into memory in 1 go.

I wanted to parse the file in a streaming fashion, i.e. read the file bit by bit and decode the json data just like the jackson streaming api.

I did some digging and found

func NewDecoder(r io.Reader) *Decoder {...} in json package

The above uses a reader to buffer & pick up data.

here is some sample code to do it:

https://github.com/jaihind213/golang/blob/master/main.go

The differences are in terms of memory footprint:

We print memory usage as we parse a ~900MB file generated by generate_data.sh.

Streaming Fashion:

Screenshot 2019-05-07 at 11.20.15

….

Screenshot 2019-05-07 at 11.31.36

Simple Parsing by reading Everything in Mem:

Screenshot 2019-05-07 at 11.32.28

Screenshot 2019-05-07 at 11.32.54

Conclusion:

The streaming parser has a very low Heap_inUse when compared to the massive usage of the readAllInMem approach.

Druid Indexing Fails – ISE – Cannot find instance of indexer to talk to!

Recently my druid overlord and middle manager stopped indexing data.

I tried the basic multiple restarts but it did not help.

The logs of the middle manager revealed the following:

2018-10-02 06:01:37 [main] WARN  RemoteTaskActionClient:102 - Exception submitting action for task[index_nom_14days_2018-10-02T06:01:29.633Z]

java.io.IOException: Failed to locate service uri

        at io.druid.indexing.common.actions.RemoteTaskActionClient.submit(RemoteTaskActionClient.java:94) [druid-indexing-service-0.10.0.jar:0.10.0]

        at io.druid.indexing.common.task.IndexTask.isReady(IndexTask.java:150) [druid-indexing-service-0.10.0.jar:0.10.0]

        at io.druid.indexing.worker.executor.ExecutorLifecycle.start(ExecutorLifecycle.java:169) [druid-indexing-service-0.10.0.jar:0.10.0]

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_181]

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_181]

        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]

        at io.druid.java.util.common.lifecycle.Lifecycle$AnnotationBasedHandler.start(Lifecycle.java:364) [java-util-0.10.0.jar:0.10.0]

        at io.druid.java.util.common.lifecycle.Lifecycle.start(Lifecycle.java:263) [java-util-0.10.0.jar:0.10.0]

        at io.druid.guice.LifecycleModule$2.start(LifecycleModule.java:156) [druid-api-0.10.0.jar:0.10.0]

        at io.druid.cli.GuiceRunnable.initLifecycle(GuiceRunnable.java:102) [druid-services-0.10.0.jar:0.10.0]

        at io.druid.cli.CliPeon.run(CliPeon.java:277) [druid-services-0.10.0.jar:0.10.0]

        at io.druid.cli.Main.main(Main.java:108) [druid-services-0.10.0.jar:0.10.0]

Caused by: io.druid.java.util.common.ISE: Cannot find instance of indexer to talk to!

        at io.druid.indexing.common.actions.RemoteTaskActionClient.getServiceInstance(RemoteTaskActionClient.java:168) ~[druid-indexing-service-0.10.0.jar:0.10.0]

        at io.druid.indexing.common.actions.RemoteTaskActionClient.submit(RemoteTaskActionClient.java:89) ~[druid-indexing-service-0.10.0.jar:0.10.0]

The main cause was

Caused by: io.druid.java.util.common.ISE: Cannot find instance of indexer to talk to!

Apparently the middle manager and its peons could not seem to find the overlord service for indexing to start and hence all jobs were failing.

Root Cause:

I logged into zookeeper and found the following to entries for Overlord/Coordinator to be empty even though they are up and running.

Since they were empty, the middle manager could not find an indexer to talk to ..!! as the middle manager looks up zookeeper.

[zk: localhost:2181(CONNECTED) 1] ls /druid/discovery/druid:coordinator
[]
[zk: localhost:2181(CONNECTED) 3] ls /druid/discovery/druid:overlord
[]

Fix:

assuming that Coordinator runs on Host_A & Overlord runs on Host_B and … do the following:

(1) Generate 2 random ids. 1 for Coordinator & 1 for Overlord

Druid code uses the java code :

System.out.println(java.util.UUID.randomUUID().toString());

you can use any random string.

lets assume you generate

45c3697f-414f-48f2-b1bc-b9ab5a0ebbd4‘ for coordinator

&

f1babb39-26c1-42cb-ac4a-0eb21ae5d77d‘ for overlord.

(2) log into zookeeper cli and run the following commands:

[zk: localhost:2181(CONNECTED) 3] create /druid/discovery/druid:coordinator/45c3697f-414f-48f2-b1bc-b9ab5a0ebbd4 {"name":"druid:coordinator","id":"45c3697f-414f-48f2-b1bc-b9ab5a0ebbd4","address":"HOST_A_IP","port":8081,"sslPort":null,"payload":null,"registrationTimeUTC":1538459656828,"serviceType":"DYNAMIC","uriSpec":null}

[zk: localhost:2181(CONNECTED) 3] create /druid/discovery/druid:overlord/f1babb39-26c1-42cb-ac4a-0eb21ae5d77d {"name":"druid:overlord","id":"f1babb39-26c1-42cb-ac4a-0eb21ae5d77d","address":"HOST_B_IP","port":8090,"sslPort":null,"payload":null,"registrationTimeUTC":1538459763281,"serviceType":"DYNAMIC","uriSpec
":null}

You will notice a registrationTimeUtc field – put in some time u see fit.

This should create the necessary zookeeper nodes for middle manager to lookup

now restart coordinator then overlord then middleManager

Submit a indexing task and it should work.

Additional tips:

In $Druid_Home/conf/druid/middleManager/runtime.properties you may have

druid.indexer.runner.javaOpts=-cp conf/druid/_common:conf/druid/middleManager/:conf/druid/middleManager:lib/* -server -Ddruid.selectors.indexing.serviceName=druid/overlord -Xmx7g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Dhadoop.mapreduce.job.user.classpath.first=true

and $Druid_Home/conf/druid/middleManager/jvm.config you may have

-Ddruid.selectors.indexing.serviceName=druid/overlord

Hope this helps you.