1)はじめに
こんにちは、みんな!多くの人がRedisとは何かを知っています。知らない場合は、公式サイトで最新情報を入手できます。
ほとんどの場合、Redisはキャッシュであり、場合によってはメッセージキューです。
しかし、少し気が狂って、Redisのみをデータストレージとして使用してアプリケーション全体を設計しようとするとどうなるでしょうか。 Redisでどのようなタスクを解決できますか?
この記事では、これらの質問に答えようとします。
ここに表示されないものは何ですか?
- すべてのRedisデータ構造の詳細はここにはありません。どのような目的で、特別な記事やドキュメントを読む必要があります。
- これには、作業で使用できる本番用のコードもありません。
ここに何が表示されますか?
- さまざまなRedisデータ構造を使用して、デートアプリケーションのさまざまなタスクを実装します。
- Kotlin +SpringBootのコード例を次に示します。
2)ユーザープロファイルの作成とクエリの方法を学びます。
-
まず、名前やいいねなどを使ってユーザープロファイルを作成する方法を学びましょう。
これを行うには、単純なKey-Valueストアが必要です。どうやってするの?
- 単に。 Redisにはデータ構造(ハッシュ)があります。本質的に、これは私たち全員にとっておなじみのハッシュマップです。
Redisクエリ言語コマンドはこことここにあります。
ドキュメントには、ページ上でこれらのコマンドを実行するためのインタラクティブなウィンドウもあります。そして、コマンドリスト全体はここにあります。
同様のリンクは、これから検討するすべての後続のコマンドで機能します。
コードでは、ほとんどすべての場所でRedisTemplateを使用しています。これは、SpringエコシステムでRedisを使用するための基本的なことです。
ここでのマップとの1つの違いは、最初の引数として「field」を渡すことです。 「フィールド」はハッシュの名前です。
fun addUser(user: User) {
val hashOps: HashOperations<String, String, User> = userRedisTemplate.opsForHash()
hashOps.put(Constants.USERS, user.name, user)
}
fun getUser(userId: String): User {
val userOps: HashOperations<String, String, User> = userRedisTemplate.opsForHash()
return userOps.get(Constants.USERS, userId)?: throw NotFoundException("Not found user by $userId")
}
上記は、Springのライブラリを使用したKotlinでの外観の例です。
その記事のすべてのコードはGithubで見つけることができます。
3)Redisリストを使用してユーザーのいいねを更新します。
-
素晴らしい!。ユーザーといいねに関する情報があります。
今、私たちはそれを好きなものを更新する方法を見つける必要があります。
イベントは非常に頻繁に発生する可能性があると想定しています。それでは、いくつかのキューで非同期アプローチを使用しましょう。そして、スケジュールに従ってキューから情報を読み取ります。
- Redisには、このような一連のコマンドを含むリストデータ構造があります。Redisリストは、FIFOキューとLIFOスタックの両方として使用できます。
Springでは、RedisTemplateからListOperationsを取得するのと同じアプローチを使用します。
右に書かなければなりません。ここでは、FIFOキューを右から左にシミュレートしているためです。
fun putUserLike(userFrom: String, userTo: String, like: Boolean) {
val userLike = UserLike(userFrom, userTo, like)
val listOps: ListOperations<String, UserLike> = userLikeRedisTemplate.opsForList()
listOps.rightPush(Constants.USER_LIKES, userLike)
}
次に、スケジュールどおりにジョブを実行します。
あるRedisデータ構造から別のデータ構造に情報を転送するだけです。例としてはこれで十分です。
fun processUserLikes() {
val userLikes = getUserLikesLast(USERS_BATCH_LIMIT).filter{ it.isLike}
userLikes.forEach{updateUserLike(it)}
}
ここでは、ユーザーの更新は非常に簡単です。前のパートのHashOperationにこんにちは。
private fun updateUserLike(userLike: UserLike) {
val userOps: HashOperations<String, String, User> = userLikeRedisTemplate.opsForHash()
val fromUser = userOps.get(Constants.USERS, userLike.fromUserId)?: throw UserNotFoundException(userLike.fromUserId)
fromUser.fromLikes.add(userLike)
val toUser = userOps.get(Constants.USERS, userLike.toUserId)?: throw UserNotFoundException(userLike.toUserId)
toUser.fromLikes.add(userLike)
userOps.putAll(Constants.USERS, mapOf(userLike.fromUserId to fromUser, userLike.toUserId to toUser))
}
次に、リストからデータを取得する方法を示します。左から取得しています。リストから大量のデータを取得するには、 range
を使用します 方法。
そして重要なポイントがあります。 rangeメソッドはリストからデータを取得するだけで、削除はしません。
したがって、データを削除するには別の方法を使用する必要があります。 トリムコード> やれ。 (そして、そこにいくつかの質問をすることができます)。
private fun getUserLikesLast(number: Long): List<UserLike> {
val listOps: ListOperations<String, UserLike> = userLikeRedisTemplate.opsForList()
return (listOps.range(Constants.USER_LIKES, 0, number)?:mutableListOf()).filterIsInstance(UserLike::class.java)
.also{
listOps.trim(Constants.USER_LIKES, number, -1)
}
}
そして、質問は次のとおりです。
- リストから複数のスレッドにデータを取得するにはどうすればよいですか?
- エラーが発生した場合にデータが失われないようにするにはどうすればよいですか?ボックスから-何もありません。 1つのスレッドでリストからデータを取得する必要があります。そして、あなたはあなた自身で生じるすべてのニュアンスを処理しなければなりません。
4)pub/subを使用してユーザーにプッシュ通知を送信する
-
前進し続ける!
すでにユーザープロファイルがあります。これらのユーザーからのいいねのストリームを処理する方法を見つけました。しかし、私たちが好きになった瞬間にユーザーにプッシュ通知を送信したい場合を想像してみてください。
どうしますか?
- いいねを処理するための非同期プロセスがすでにあるので、そこにプッシュ通知を送信するように構築しましょう。もちろん、その目的でWebSocketを使用します。そして、私たちはそれをWebSocket経由で送信することができます。しかし、送信する前に実行時間の長いコードを実行したい場合はどうでしょうか。または、WebSocketでの作業を別のコンポーネントに委任したい場合はどうなりますか?
- データを取得して、あるRedisデータ構造(リスト)から別のデータ構造(pub / sub)に再度転送します。
fun processUserLikes() {
val userLikes = getUserLikesLast(USERS_BATCH_LIMIT).filter{ it.isLike}
pushLikesToUsers(userLikes)
userLikes.forEach{updateUserLike(it)}
}
private fun pushLikesToUsers(userLikes: List<UserLike>) {
GlobalScope.launch(Dispatchers.IO){
userLikes.forEach {
pushProducer.publish(it)
}
}
}
@Component
class PushProducer(val redisTemplate: RedisTemplate<String, String>, val pushTopic: ChannelTopic, val objectMapper: ObjectMapper) {
fun publish(userLike: UserLike) {
redisTemplate.convertAndSend(pushTopic.topic, objectMapper.writeValueAsString(userLike))
}
}
トピックにバインドするリスナーは、構成にあります。
これで、リスナーを別のサービスに取り込むことができます。
@Component
class PushListener(val objectMapper: ObjectMapper): MessageListener {
private val log = KotlinLogging.logger {}
override fun onMessage(userLikeMessage: Message, pattern: ByteArray?) {
// websocket functionality would be here
log.info("Received: ${objectMapper.readValue(userLikeMessage.body, UserLike::class.java)}")
}
}
5)地理的操作を通じて最も近いユーザーを見つける。
- いいねは終わりです。しかし、特定のポイントに最も近いユーザーを見つける機能についてはどうでしょうか。
- GeoOperationsはこれを支援します。キーと値のペアを保存しますが、現在の値はユーザー座標です。見つけるには、
[radius](https://redis.io/commands/georadius)
を使用します 方法。検索するユーザーIDと検索範囲自体を渡します。
ユーザーIDを含むRedisの結果が返されます。
fun getNearUserIds(userId: String, distance: Double = 1000.0): List<String> {
val geoOps: GeoOperations<String, String> = stringRedisTemplate.opsForGeo()
return geoOps.radius(USER_GEO_POINT, userId, Distance(distance, RedisGeoCommands.DistanceUnit.KILOMETERS))
?.content?.map{ it.content.name}?.filter{ it!= userId}?:listOf()
}
6)ストリームを介してユーザーの場所を更新する
-
必要なほとんどすべてを実装しました。しかし、すぐに変更される可能性のあるデータを更新する必要がある状況が再び発生しました。
したがって、キューを再度使用する必要がありますが、よりスケーラブルなものがあると便利です。
- Redisストリームはこの問題の解決に役立ちます。
- おそらくあなたはKafkaについて知っていて、おそらくKafkaストリームについても知っているでしょうが、それはRedisストリームと同じではありません。しかし、Kafka自体はRedisストリームと非常によく似ています。これは、コンシューマーグループとオフセットを持つログアヘッドデータ構造でもあります。これはより複雑なデータ構造ですが、データを並列に取得し、リアクティブなアプローチを使用することができます。
詳細については、Redisストリームのドキュメントを参照してください。
Springには、Redisデータ構造を操作するためのReactiveRedisTemplateとRedisTemplateがあります。 RedisTemplateを使用して値を書き込み、ReactiveRedisTemplateを使用して読み取る方が便利です。ストリームについて話す場合。しかし、そのような場合、何も機能しません。
SpringまたはRedisが原因で、なぜこのように機能するのかを誰かが知っている場合は、コメントに書き込んでください。
fun publishUserPoint(userPoint: UserPoint) {
val userPointRecord = ObjectRecord.create(USER_GEO_STREAM_NAME, userPoint)
reactiveRedisTemplate
.opsForStream<String, Any>()
.add(userPointRecord)
.subscribe{println("Send RecordId: $it")}
}
リスナーメソッドは次のようになります。
@Service
class UserPointsConsumer(
private val userGeoService: UserGeoService
): StreamListener<String, ObjectRecord<String, UserPoint>> {
override fun onMessage(record: ObjectRecord<String, UserPoint>) {
userGeoService.addUserPoint(record.value)
}
}
データを地理データ構造に移動するだけです。
7)HyperLogLogを使用して一意のセッションをカウントします。
- 最後に、1日にアプリケーションに入力したユーザーの数を計算する必要があると想像してみましょう。
- さらに、多くのユーザーがいる可能性があることに注意してください。したがって、ハッシュマップを使用する単純なオプションは、メモリを大量に消費するため、適切ではありません。より少ないリソースを使用してこれを行うにはどうすればよいですか?
- 確率的なデータ構造HyperLogLogがそこで機能します。ウィキペディアのページで詳細を読むことができます。重要な機能は、このデータ構造により、ハッシュマップのオプションよりも大幅に少ないメモリを使用して問題を解決できることです。
fun uniqueActivitiesPerDay(): Long {
val hyperLogLogOps: HyperLogLogOperations<String, String> = stringRedisTemplate.opsForHyperLogLog()
return hyperLogLogOps.size(Constants.TODAY_ACTIVITIES)
}
fun userOpenApp(userId: String): Long {
val hyperLogLogOps: HyperLogLogOperations<String, String> = stringRedisTemplate.opsForHyperLogLog()
return hyperLogLogOps.add(Constants.TODAY_ACTIVITIES, userId)
}
8)結論
この記事では、さまざまなRedisデータ構造について説明しました。あまり人気のない地理操作とHyperLogLogを含みます。
実際の問題を解決するためにそれらを使用しました。
私たちはほとんどTinderを設計しました、これの後にFAANGで可能です)))
また、Redisを使用するときに発生する可能性のある主なニュアンスと問題を強調しました。
Redisは非常に機能的なデータストレージです。また、インフラストラクチャにすでにそれがある場合は、Redisを、不要な複雑さを伴わずに他のタスクを解決するためのツールとして検討する価値があります。
PS:
すべてのコード例はgithubにあります。
間違いに気づいたらコメントを書いてください。
いくつかの技術を使用して説明するそのような方法について、以下にコメントを残してください。好きですか?
Twitterでフォローしてください:🐦@ de ____ ro