Can apache flum hdfs sink accept a dynamic write path?

I am new to Apache. I am trying to understand how I can get json (as an http source), parse it and save it in a dynamic way via hdfs according to the content.
For example:
if json:

[{   
  "field1" : "value1",
  "field2" : "value2"
}]

then hdfs path will be:
 / Some default root-path / value1 / value2 / some value name file
Is there a tray configuration that allows me to do this?

Here is my current configuration (accepts json via http and saves it along the way according to the timestamp):

#flume.conf: http source, hdfs sink

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type =  org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 9000
#a1.sources.r1.handler = org.apache.flume.http.JSONHandler

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/uri/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Thank!

+5
source share
1 answer

The solution was in the documentation for the hdfs shell :

:

#flume.conf: http source, hdfs sink

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type =  org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 9000
#a1.sources.r1.handler = org.apache.flume.http.JSONHandler

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/uri/events/%{field1}
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1  

:

curl -X POST -d '[{  "headers" : {           "timestamp" : "434324343", "host" :"random_host.example.com", "field1" : "val1"            },  "body" : "random_body"  }]' localhost:9000
+8

All Articles