apeescape2.com
  • メイン
  • モバイルデザイン
  • 財務プロセス
  • 計画と予測
  • 収益と成長
データサイエンスとデータベース

Apache Sparkストリーミングチュートリアル:トレンドのTwitterハッシュタグの特定

今日、データはかつてないほど急速に成長し、蓄積されています。現在、私たちの世界で生成されたすべてのデータの約90%は、過去2年間にのみ生成されました。この驚異的な成長率のため、ビッグデータプラットフォームは、このような膨大な量のデータを維持するために根本的なソリューションを採用する必要がありました。

今日の主なデータソースの1つは、ソーシャルネットワークです。実際の例を示しましょう。最も重要なビッグデータエコーソリューションの1つであるApacheSparkとPythonを使用して、ソーシャルネットワークデータから洞察をリアルタイムで処理、分析、抽出します。

Apache Spark Streamingを使用して、トレンドのTwitterハッシュタグなどのソーシャルメディアから洞察を抽出できます。



この記事では、Pythonを使用してTwitterからオンラインストリームを読み取り、Apache Spark Streamingを使用してツイートを処理してハッシュタグを識別し、最後にトレンドのハッシュタグを返し、このデータを実際に表す簡単なアプリケーションを構築する方法を説明します。 -タイムダッシュボード。

TwitterAPI用の独自の資格情報の作成

Twitterからツイートを取得するには、に登録する必要があります TwitterApps 「Createnewapp」をクリックし、以下のフォームに記入して「CreateyourTwitterapp」をクリックします。

スクリーンショット:Twitterアプリの作成方法。

次に、新しく作成したアプリに移動し、[キーとアクセストークン]タブを開きます。次に、[アクセストークンを生成する]をクリックします。

スクリーンショット:Twitterアプリの認証情報、キー、アクセストークンの設定。

新しいアクセストークンは次のように表示されます。

スクリーンショット:Twitterアプリのアクセストークンの設定。

これで、次のステップに進む準備が整いました。

TwitterHTTPクライアントの構築

このステップでは、Pythonを使用してTwitter APIからツイートを取得し、それらをSparkStreamingインスタンスに渡す簡単なクライアントを構築する方法を示します。どんな専門家にとってもフォローしやすいはずです Python開発者 。

まず、twitter_app.pyというファイルを作成しましょう。次に、以下のようにコードを一緒に追加します。

以下のように使用するライブラリをインポートします。

import socket import sys import requests import requests_oauthlib import json

そして、Twitterに接続するためにOAuthで使用される変数を次のように追加します。

# Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)

次に、get_tweetsという新しい関数を作成します。 Twitter API URLを呼び出し、ツイートのストリームに対する応答を返します。

def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response

次に、上記の応答を受け取り、ツイートのJSONオブジェクト全体からツイートのテキストを抽出する関数を作成します。その後、TCP接続を介してすべてのツイートをSpark Streamingインスタンス(後で説明します)に送信します。

def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print('Tweet Text: ' + tweet_text) print ('------------------------------------------') tcp_connection.send(tweet_text + ' ') except: e = sys.exc_info()[0] print('Error: %s' % e)

次に、sparkが接続するアプリホストソケット接続を作成する主要部分を作成します。ここでIPをlocalhostに構成しますすべてが同じマシンとポート9009で実行されるためです。次に、get_tweetsと呼びます。 Twitterからツイートを取得し、その応答をソケット接続とともにsend_tweets_to_sparkに渡すための、上記で作成したメソッド。 Sparkにツイートを送信してくれました。

TCP_IP = 'localhost' TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print('Waiting for TCP connection...') conn, addr = s.accept() print('Connected... Starting getting tweets.') resp = get_tweets() send_tweets_to_spark(resp, conn)

ApacheSparkストリーミングアプリケーションのセットアップ

着信ツイートをリアルタイムで処理し、それらからハッシュタグを抽出して、言及されているハッシュタグの数を計算するSparkストリーミングアプリを構築しましょう。

イラスト:Sparkストリーミングにより、着信ツイートのリアルタイム処理とハッシュタグ抽出が可能になります

まず、Sparkコンテキストscのインスタンスを作成する必要があり、次にストリーミングコンテキストsscを作成しました。 scから2秒ごとに受信したすべてのストリームで変換を実行する2秒のバッチ間隔で。ログレベルをERRORに設定していることに注意してくださいSparkが書き込むほとんどのログを無効にするため。

定期的なRDDチェックポイントを可能にするために、ここでチェックポイントを定義しました。ステートフル変換を使用するため、これはアプリで使用する必要があります(同じセクションで後述します)。

次に、ポート9009で前に作成したソケットサーバーに接続するメインのDStreamdataStreamを定義します。そのポートからツイートを読んでください。 DStreamの各レコードはツイートになります。

from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName('TwitterStreamApp') # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel('ERROR') # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint('checkpoint_TwitterApp') # read data from port 9009 dataStream = ssc.socketTextStream('localhost',9009)

次に、変換ロジックを定義します。まず、すべてのツイートを単語に分割し、単語RDDに入れます。次に、すべての単語からハッシュタグのみをフィルタリングし、それらを(hashtag, 1)のペアにマッピングします。そしてそれらをハッシュタグRDDに入れます。

次に、ハッシュタグが言及された回数を計算する必要があります。関数reduceByKeyを使用してこれを行うことができます。この関数は、各バッチごとにハッシュタグが言及された回数を計算します。つまり、各バッチのカウントをリセットします。

この場合、すべてのバッチのカウントを計算する必要があるため、updateStateByKeyという別の関数を使用します。この関数を使用すると、RDDの状態を維持しながら、新しいデータで更新できます。この方法はStateful Transformationと呼ばれます。

updateStateByKeyを使用するには、チェックポイントを構成する必要があり、前の手順で行ったことに注意してください。

# split each tweet into words words = dataStream.flatMap(lambda line: line.split(' ')) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()

updateStateByKey updateと呼ばれるパラメータとして関数を取ります関数。 RDDの各アイテムで実行され、必要なロジックを実行します。

この例では、aggregate_tags_countという更新関数を作成しましたそれはすべてのnew_valuesを合計しますハッシュタグごとにtotal_sumに追加しますこれはすべてのバッチの合計であり、データをtags_totalsに保存します。 RDD。

def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)

次に、tags_totalsで処理を行いますSpark SQL Contextを使用して一時テーブルに変換するためにすべてのバッチでRDDを実行し、次にselectステートメントを実行してカウントを含む上位10個のハッシュタグを取得してhashtag_counts_dfに配置します。データフレーム。

def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print('----------- %s -----------' % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable('hashtags') # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql('select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10') hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print('Error: %s' % e)

Sparkアプリケーションの最後のステップは、hashtag_counts_dfを送信することです。ダッシュボードアプリケーションへのデータフレーム。そこで、データフレームを2つの配列に変換します。1つはハッシュタグ用で、もう1つはカウント用です。次に、RESTAPIを介してダッシュボードアプリケーションに送信します。

def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select('hashtag').collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select('hashtag_count').collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)

最後に、hashtag_counts_dfの実行および印刷中のSparkStreamingの出力例を示します。出力は、バッチ間隔に従って正確に2秒ごとに印刷されます。

バッチ間隔設定ごとに印刷されたTwitterSparkストリーミング出力の例

データを表すためのシンプルなリアルタイムダッシュボードを作成する

次に、Sparkによってリアルタイムで更新されるシンプルなダッシュボードアプリケーションを作成します。 Python、Flask、およびを使用してビルドします Charts.js 。

まず、以下の構造のPythonプロジェクトを作成し、ダウンロードして追加しましょう。 Chart.js 静的ディレクトリにファイルします。

イラスト:Twitterハッシュタグ分析で使用するPythonプロジェクトの作成

次に、app.pyでファイルに、update_dataという関数を作成します。この関数は、URL http://localhost:5001/updateDataを介してSparkによって呼び出されます。グローバルラベルと値の配列を更新するため。

また、関数refresh_graph_dataは、AJAXリクエストによって呼び出され、新しく更新されたラベルと値の配列をJSONとして返すように作成されます。関数get_chart_page chart.htmlをレンダリングします呼び出されたときのページ。

from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route('/') def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print('labels now: ' + str(labels)) print('data now: ' + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return 'error',400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print('labels received: ' + str(labels)) print('data received: ' + str(values)) return 'success',201 if __name__ == '__main__': app.run(host='localhost', port=5001)

それでは、chart.htmlで簡単なグラフを作成しましょう。ハッシュタグデータを表示し、リアルタイムで更新するためのファイル。以下に定義するように、Chart.jsをインポートする必要がありますおよびjquery.min.js JavaScriptライブラリ。

bodyタグでは、次のステップでJavaScriptを使用してグラフを表示するときに参照するために、キャンバスを作成してIDを指定する必要があります。

Top Trending Twitter Hashtags

Top Trending Twitter Hashtags

それでは、以下のJavaScriptコードを使用してグラフを作成しましょう。最初にcanvas要素を取得し、次に新しいグラフオブジェクトを作成してcanvas要素を渡し、以下のようにそのデータオブジェクトを定義します。

データのラベルとデータは、get_chart_pageを呼び出すときにページのレンダリング中に返されるラベルと値の変数で囲まれていることに注意してください。 app.pyの関数ファイル。

最後の残りの部分は、毎秒Ajaxリクエストを実行し、URL /refreshDataを呼び出すように構成された関数です。これによりrefresh_graph_dataが実行されます。 app.pyで新しく更新されたデータを返し、新しいデータをレンダリングする文字を更新します。

var ctx = document.getElementById('chart'); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} '{{item}}', {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000);

アプリケーションを一緒に実行する

以下の順序で3つのアプリケーションを実行してみましょう。1。Twitterアプリクライアント。 2.スパークアプリ。 3.ダッシュボードWebアプリ。

cプログラミング言語の使用

次に、URLを使用してリアルタイムダッシュボードにアクセスできます

これで、以下のようにチャートが更新されていることがわかります。

アニメーション:リアルタイムのTwitterトレンドハッシュタグチャート

Apacheストリーミングの実際のユースケース

Spark Streamingを使用してリアルタイムでデータの簡単なデータ分析を行い、RESTfulWebサービスを使用して簡単なダッシュボードと直接統合する方法を学びました。この例から、大量のデータストリームをキャプチャして変換し、すぐに意思決定を行うために簡単に使用できる貴重な洞察を抽出するため、Sparkがいかに強力であるかがわかります。実装でき、ニュースやマーケティングなどのさまざまな業界に役立つ、役立つユースケースはたくさんあります。

イラスト:ハッシュタグは、複数の業界に適用できる貴重な洞察と感情を抽出するために使用できます。

ニュース業界の例

最も頻繁に言及されるハッシュタグを追跡して、ソーシャルメディアで人々が最も話題にしているトピックを知ることができます。また、特定のハッシュタグとそのツイートを追跡して、世界中の特定のトピックやイベントについて人々が何を言っているかを知ることができます。

マーケティングの例

ツイートのストリームを収集し、感情分析を行うことで、それらを分類し、人々の興味を判断して、彼らの興味に関連するオファーでそれらをターゲットにすることができます。

また、ビッグデータ分析に特に適用でき、多くの業界に役立つユースケースがたくさんあります。一般的なApacheSparkのユースケースの詳細については、次のいずれかを確認することをお勧めします。 以前の投稿 。

SparkStreamingについてもっと読むことをお勧めします ここに その機能について詳しく知り、データをより高度に変換して、リアルタイムでより多くの洞察を得ることができます。

基本を理解する

Apache Sparkは何をしますか?

非常に大規模な高速データ処理、ストリーミング、機械学習を実行します。

Sparkは何に使用されますか?

ビッグデータプラットフォームでのデータ変換、予測分析、不正検出に使用できます。

Twitter APIとは何ですか?

Twitterでは、APIを使用してデータを取得できます。それらが利用可能にする方法の1つは、定義した検索条件でリアルタイムにツイートをストリーミングすることです。

気が狂うことなくOAuth2をDjango / DRFバックエンドに統合する方法

バックエンド

気が狂うことなくOAuth2をDjango / DRFバックエンドに統合する方法
ApeeScapeは、企業が効果的なリモートワーク環境を作成できるようにリモートワークソリューションを発表しました

ApeeScapeは、企業が効果的なリモートワーク環境を作成できるようにリモートワークソリューションを発表しました

その他

人気の投稿
ビジネスにおけるデザイン思考の価値
ビジネスにおけるデザイン思考の価値
symfonyコンポーネントによる真の依存性注入
symfonyコンポーネントによる真の依存性注入
ApeeScapeがStaffing.comを立ち上げ、ギグ、フリーランス、タレントエコノミーを推進
ApeeScapeがStaffing.comを立ち上げ、ギグ、フリーランス、タレントエコノミーを推進
ApeeScapeがブロックチェーンエンジニアリング向けのエリートオンデマンドタレントネットワークを開始
ApeeScapeがブロックチェーンエンジニアリング向けのエリートオンデマンドタレントネットワークを開始
不採算事業を拡大しない:なぜ単位経済学(それでも)が重要なのか
不採算事業を拡大しない:なぜ単位経済学(それでも)が重要なのか
 
開発者に最適なヘッドフォンの選択
開発者に最適なヘッドフォンの選択
Grape Gemチュートリアル:RubyでRESTのようなAPIを構築する方法
Grape Gemチュートリアル:RubyでRESTのようなAPIを構築する方法
機械学習理論とその応用の紹介:例を含むビジュアルチュートリアル
機械学習理論とその応用の紹介:例を含むビジュアルチュートリアル
ヘルムには誰がいますか? –デザインリーダーシップの質の分析
ヘルムには誰がいますか? –デザインリーダーシップの質の分析
デザイナーのための色彩理論–クラッシュコース(インフォグラフィック付き)
デザイナーのための色彩理論–クラッシュコース(インフォグラフィック付き)
人気の投稿
  • anglejsでディレクティブを使用する方法
  • 資本予算は1つです。
  • イケアは、次の顧客のニーズのどれに基づいて戦略的に位置付けていますか?
  • 企業がサプライヤーの力を減らすことができる最も一般的な方法の1つは何ですか
  • ヘッダーファイルc ++を含める
  • javascriptデザインパターンインタビューの質問
カテゴリー
その他 モバイルデザイン 革新 データサイエンスとデータベース リモートの台頭 分散チーム 技術 設計プロセス Uxデザイン 収益性と効率性

© 2021 | 全著作権所有

apeescape2.com