Halo
发布于 2022-11-11 / 125 阅读 / 0 评论 / 0 点赞

curl 之web hdfs

#pragma once

#include <curl/curl.h>

#include <string>
using std::string;

#include <sstream>
using std::stringstream;

#include <glog/logging.h>

#include "./file.hpp"

namespace backtest::utils
{
  class WebHdfs
  {
  private:
    string cmd;
    string username;

    static size_t filewrite_data(const char *ptr, size_t size, size_t nmemb, void *stream)
    {
      size_t written = fwrite(ptr, size, nmemb, (FILE *)stream);
      return written;
    }

  public:
    /**
     * @brief Construct a new Web Hdfs object
     * the root path like: http://10.60.2.114:9870/webhdfs/v1/user/username
     * full op to see: https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html
     *
     * @param hostaddr as 10.60.2.114:9870
     * @param username
     */
    WebHdfs(const string &hostaddr, const string &username)
    {
      this->cmd = "http://" + hostaddr + "/webhdfs/v1/user/" + username;
      this->username = username;
      curl_global_init(CURL_GLOBAL_ALL);
    }

    /**
     * @brief upload file to webhdfs
     *
     * @param local_file
     * @param hdfs_file
     * @param force
     * @return true
     * @return false
     */
    bool Upload(const string &local_file, const string &hdfs_file = "", bool force = false)
    {
      try
      {
        stringstream url;
        url << this->cmd;
        if (hdfs_file == "")
        {
          string file_name = file::GetFileName(local_file);
          url << "/" << file_name;
        }
        else
        {
          url << "/" << hdfs_file;
        }
        url << "?op=CREATE&user.name=" << username;
        if (force)
        {
          url << "&overwrite=true";
        }

        // init curl
        CURL *curl = curl_easy_init();
        if (!curl)
        {
          LOG(ERROR) << "curl init fail.";
          return false;
        }

        // setting
        curl_easy_setopt(curl, CURLOPT_URL, url.str().c_str());
        curl_easy_setopt(curl, CURLOPT_PUT, 1L);
        curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); // auto redirect
        FILE *fd = fopen(local_file.c_str(), "rb");
        curl_easy_setopt(curl, CURLOPT_READDATA, fd);
        curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, (curl_off_t)file::GetSize(local_file));

        // get result
        CURLcode res = curl_easy_perform(curl);
        fclose(fd);
        if (res != CURLE_OK)
        {
          LOG(ERROR) << "upload file to hdfs failed: " << curl_easy_strerror(res);
          return false;
        }

        curl_easy_cleanup(curl);
      }
      catch (const std::exception &e)
      {
        LOG(ERROR) << e.what();
        return false;
      }
      return true;
    }

    /**
     * @brief download file from webhdfs
     *
     * @param hdfs_file
     * @param local_file
     * @param overwrite
     * @return true
     * @return false
     */
    bool Download(const string &hdfs_file, const string &local_file = "", bool overwrite = false)
    {
      try
      {
        stringstream url;
        url << cmd << "/" << hdfs_file << "?op=OPEN";

        // init
        CURL *curl = curl_easy_init();
        if (!curl)
        {
          LOG(ERROR) << "curl init fail.";
          return false;
        }

        // setting
        curl_easy_setopt(curl, CURLOPT_HTTPGET, 1L);
        curl_easy_setopt(curl, CURLOPT_URL, url.str().c_str());
        curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, filewrite_data);
        curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); // auto redirect

        FILE *fd{nullptr};
        string temp_file = local_file;
        if (local_file == "")
        {
          temp_file = file::GetFileName(hdfs_file);
        }
        string flag = "wb";
        if (overwrite)
        {
          flag = "wb+";
        }
        fd = fopen(local_file.c_str(), flag.c_str());
        curl_easy_setopt(curl, CURLOPT_WRITEDATA, fd);

        // get result
        auto res = curl_easy_perform(curl);
        fclose(fd);
        if (res != CURLE_OK)
        {
          LOG(ERROR) << "get file from hdfs failed: " << curl_easy_strerror(res);
          return false;
        }

        curl_easy_cleanup(curl);
      }
      catch (const std::exception &e)
      {
        LOG(ERROR) << e.what();
        return false;
      }

      return true;
    }

    /**
     * @brief delete file
     *
     * @param hdfs_file
     * @return true
     * @return false
     */
    bool Delete(const string &hdfs_file, bool recursive = false)
    {
      try
      {
        stringstream url;
        url << cmd << "/" << hdfs_file << "?op=DELETE";
        if (recursive)
        {
          url << "&op=DELETE&recursive=true";
        }

        // init curl
        CURL *curl = curl_easy_init();
        if (!curl)
        {
          LOG(ERROR) << "curl init fail.";
          return false;
        }

        // setting
        curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
        curl_easy_setopt(curl, CURLOPT_URL, url.str().c_str());
        curl_easy_setopt(curl, CURLOPT_INFILESIZE, 0);

        CURLcode res = curl_easy_perform(curl);
        if (res != CURLE_OK)
        {
          LOG(ERROR) << "hdfs del failed: " << curl_easy_strerror(res);
          return false;
        }

        curl_easy_cleanup(curl);
      }
      catch (const std::exception &e)
      {
        LOG(ERROR) << e.what();
        return false;
      }

      return true;
    }

    /**
     * @brief rename file
     *
     * @param old_name
     * @param new_name
     * @return true
     * @return false
     */
    bool Rename(const string &old_name, const string &new_name)
    {
      try
      {
        stringstream url;
        url << cmd << "/" << old_name << "?op=RENAME&destination=/user/" << username << "/" << new_name;

        // init
        CURL *curl = curl_easy_init();
        if (!curl)
        {
          LOG(ERROR) << "curl init fail.";
          return false;
        }

        // setting
        curl_easy_setopt(curl, CURLOPT_PUT, 1L);
        curl_easy_setopt(curl, CURLOPT_URL, url.str().c_str());
        curl_easy_setopt(curl, CURLOPT_INFILESIZE, 0);

        CURLcode res = curl_easy_perform(curl);
        if (res != CURLE_OK)
        {
          LOG(ERROR) << "hdfs rename failed: " << curl_easy_strerror(res);
          return false;
        }

        curl_easy_cleanup(curl);
      }
      catch (const std::exception &e)
      {
        LOG(ERROR) << e.what();
        return false;
      }

      return true;
    }

    /**
     * @brief move file
     *
     * @param src_name
     * @param target_name
     * @return true
     * @return false
     */
    bool Move(const string &src_name, const string &target_name)
    {
      return Rename(src_name, target_name);
    }
  };
}

评论