본문 바로가기
빅데이터/Hadoop

스파크 카프카 실시간 연동 구축

by 처리2 2020. 7. 1.
반응형

ㄹㅇ 여기저기 다찾아봄

 

참고사이트 : https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

 

나는 아직 scala에 대한 공부가 더 필요한거 같다...

def main(args: Array[String]) {
    val brokers = "master01:9092,slave01:9092"   // <--- IP 설정  (1)
    val topics = Array("spark")       //  <--- topic 이름 설정 (2)
    val conf = new SparkConf().setAppName("kafka").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(1))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )


    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    val record = stream.map(
      record => (
          record.value.slice(0,record.value.lastIndexOf(" - - "))
        )
    )

    val recordCnt = record.map(x => (x,1)).reduceByKey(_+_)
    val recordRDD = recordCnt.reduceByKeyAndWindow((a:Int,b:Int) => (a+b) , Seconds(20),Seconds(2))

    recordRDD.foreachRDD{ rdd =>
      rdd.foreach{
        record =>
            println(record._1 + "////" + record._2)
      }
    }

    ssc.start()
    ssc.awaitTermination()
반응형

'빅데이터 > Hadoop' 카테고리의 다른 글

SPARK STREAMING 데이터 중복처리  (0) 2020.07.20
KAFKA 와 JAVA 의 HTTP 통신  (0) 2020.07.06
SPARK 설치 후 zepplien까지 ~  (0) 2020.06.25
CentOs7 - Python 3.6.8 설치  (12) 2020.06.23
카프카 구축하기  (0) 2020.06.23

댓글