반응형
ㄹㅇ 여기저기 다찾아봄
참고사이트 : https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
나는 아직 scala에 대한 공부가 더 필요한거 같다...
def main(args: Array[String]) {
val brokers = "master01:9092,slave01:9092" // <--- IP 설정 (1)
val topics = Array("spark") // <--- topic 이름 설정 (2)
val conf = new SparkConf().setAppName("kafka").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val record = stream.map(
record => (
record.value.slice(0,record.value.lastIndexOf(" - - "))
)
)
val recordCnt = record.map(x => (x,1)).reduceByKey(_+_)
val recordRDD = recordCnt.reduceByKeyAndWindow((a:Int,b:Int) => (a+b) , Seconds(20),Seconds(2))
recordRDD.foreachRDD{ rdd =>
rdd.foreach{
record =>
println(record._1 + "////" + record._2)
}
}
ssc.start()
ssc.awaitTermination()
반응형
'빅데이터 > Hadoop' 카테고리의 다른 글
SPARK STREAMING 데이터 중복처리 (0) | 2020.07.20 |
---|---|
KAFKA 와 JAVA 의 HTTP 통신 (0) | 2020.07.06 |
SPARK 설치 후 zepplien까지 ~ (0) | 2020.06.25 |
CentOs7 - Python 3.6.8 설치 (12) | 2020.06.23 |
카프카 구축하기 (0) | 2020.06.23 |
댓글