Module 3.2: Accessing Meetup.com RSVP Stream API using Scala Kafka Producer


Prerequisite

  • Java 1.8
  • Scala 2.12.8
  • IntelliJ IDEA Community Edition

Walk-through

In this article, we are going to discuss about how to access/retrieve stream data from Meetup.com RSVP Stream API using Scala Kafka Producer.

In this Scala Kafka Producer program, we are accessing(HTTP GET Request) stream data(RSVP Message) from http://stream.meetup.com/2/rsvps and publishing RSVP Message as JSON string on the kafka topic name called "meetuprsvptopic".



kafka_producer_demo.scala


package com.datamaking.meetup.rsvp.stream

import java.net.URL
import java.util.Properties

import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

object kafka_producer_demo {

  def main(args: Array[String]): Unit = {
    println("Kafka Producer Application Started ...")

    // Code Block 1 Starts Here

    // Code Block 1 Ends Here

    // Code Block 2 Starts Here

    // Code Block 2 Ends Here

    // Code Block 3 Starts Here

    // Code Block 3 Ends Here

    // Code Block 4 Starts Here

    // Code Block 4 Ends Here

    println("Kafka Producer Application Completed.")
  }
}


    // Code Block 1 Starts Here
    val kafka_topic_name = "meetuprsvptopic"
    val kafka_bootstrap_servers = "localhost:9092"

    // meetup.com RSVP stream REST API endpoint
    val meetup_rsvp_stream_api_endpoint = "http://stream.meetup.com/2/rsvps"
    // Code Block 1 Ends Here


    // Code Block 2 Starts Here
    val url_object = new URL(meetup_rsvp_stream_api_endpoint)
    val connection_object = url_object.openConnection()
    val jsonfactory_object = new JsonFactory(new ObjectMapper)
    val parser_object = jsonfactory_object.createParser(connection_object.getInputStream)
    // Code Block 2 Ends Here


    // Code Block 3 Starts Here
    // Set the kafka producer details in the Properties object
    val properties_object = new Properties()
    properties_object.put("bootstrap.servers", kafka_bootstrap_servers)
    properties_object.put("acks", "all")
    properties_object.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    properties_object.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    properties_object.put("enable.auto.commit", "true")
    properties_object.put("auto.commit.interval.ms", "1000")
    properties_object.put("session.timeout.ms", "30000")
    // Code Block 3 Ends Here


    // Code Block 4 Starts Here
    val kafka_producer_object = new KafkaProducer[String,String](properties_object)
    while (parser_object.nextToken() != null)
    {
      val message_record = parser_object.readValueAsTree().toString()
      println(message_record)
      val producer_record_object = new ProducerRecord[String, String](kafka_topic_name, message_record)
      kafka_producer_object.send(producer_record_object)
    }
    kafka_producer_object.close()
    // Code Block 4 Ends Here


build.sbt




name := "meetup_rsvp_kafka_producer"

version := "1.0"

scalaVersion := "2.12.8"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.3.1"


Summary

In this article, we have successfully learned the Architecture of "Real-Time Meetup.com RSVP Message Processing Application". Please go through all these steps and provide your feedback and post your queries/doubts if you have. Thank you. Appreciated.

You can build this project and run on your Spark and Hadoop cluster or reach to us for FREE Spark and Hadoop VM.

Request for FREE Spark and Hadoop VM

Click here for more details about FREE Spark and Hadoop VM

Happy Learning !!!

Post a Comment

0 Comments